본문 바로가기
프로그래밍/C & C++

[C++] 생산자(Producer) / 소비자(Consumer) 패턴

by 별준 2021. 8. 9.

References

Contents

  • 생산자 / 소비자 패턴

Producer - Consumer 패턴 예시

생산자(Producer) - 소비자(Consumer) 패턴은 멀티 쓰레드 프로그램에서 자주 사용되는 패턴입니다.

 

여기서 생산자(producer)는 무언가 처리할 일을 받아오는 쓰레드를 의미합니다. 

예를 들어, 웹사이트를 다운로드하여 분석하는 프로그램을 만들었다고 가정한다면, 이 경우 웹사이트를 다운로드하는 쓰레드가 생산자가 됩니다.

 

소비자(consumer)는 받은 일을 처리하는 쓰레드를 의미하며, 앞의 예에서 다운로드 받은 웹페이지를 분석하는 쓰레드가 소비자 역할을 하게 됩니다.

 

위와 같은 예시를 쓰레드로 구현해보겠습니다.

#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>
#include <string>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,	int index) {
	for (int i = 0; i < 5; i++) {
		std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
		std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";

		m->lock();
		downloaded_pages->push(content);
		m->unlock();
	}
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,	int* num_processed) {
	while (*num_processed < 25) { // 처리되는 페이지의 총합
		m->lock();
		if (downloaded_pages->empty()) { // 다운로드 페이지가 없다면 대기
			m->unlock();

			std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 대기시간
			continue;
		}

		// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
		std::string content = downloaded_pages->front();
		downloaded_pages->pop();

		(*num_processed)++;
		m->unlock();

		std::cout << content;
		std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리 후 대기시간
	}
}

int main(void) {
	std::queue<std::string> downloaded_pages;
	std::mutex m;

	std::vector<std::thread> producers;
	for (int i = 0; i < 5; i++) {
		producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
	}

	int num_processed = 0;
	std::vector<std::thread> consumers;
	for (int i = 0; i < 3; i++) {
		consumers.push_back(std::thread(consumer, &downloaded_pages, &m, &num_processed));
	}

	for (int i = 0; i < 5; i++)
		producers[i].join();
	for (int i = 0; i < 3; i++)
		consumers[i].join();

	return 0;
}

위 코드를 실행시키면 다음의 결과를 확인할 수 있습니다.

먼저 producer 쓰레드에서는 웹페이지를 계속 다운로드하는 역할을 담당합니다. 이 때, 다운로드한 페이지는 downloaded_pages 라는 큐에 저장되며, 큐를 사용하는 이유는 FIFO 특성을 이용하기 위해서 입니다.

즉, 먼저 다운로드한 페이지를 먼저 처리하기 위해서 입니다.

 

producer는 실제 다운로드하는 코드는 아니고, 해당 상황을 시뮬레이션하였습니다.

for (int i = 0; i < 5; i++) {
	std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
	std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";

	m->lock();
	downloaded_pages->push(content);
	m->unlock();
}

한 웹사이트당 5개의 페이지를 다운받고 있으며, std::this_thread_sleep_for을 통해서 다운로드 시간을 sleep 시켰다가, 다운로드 받은 내용을 content에 저장합니다. 

그리고 다운받은 페이지를 작업 큐에 삽입해야하는데, 이때 주의할 점은 producer 쓰레드가 총 5개이기 때문에 downloaded_pages 큐에 접근하는 쓰레드들 사이에서 race condition이 발생할 수 있다는 것입니다.

 

이를 방지하기 위해서 뮤텍스 m으로 해당 코드를 critical section으로 감싸서 문제가 발생하지 않도록 합니다.

 

다음은 consumer 입니다.

while (*num_processed < 25) { // 처리되는 페이지의 총합
	m->lock();
	if (downloaded_pages->empty()) { // 다운로드 페이지가 없다면 대기
		m->unlock();

		std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 대기시간
		continue;
	}

	// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
	std::string content = downloaded_pages->front();
	downloaded_pages->pop();

	(*num_processed)++;
	m->unlock();

	std::cout << content;
	std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리시간
}

consumer 쓰레드 입장에서는 하려고 하는 작업이 언제 들어올 지 알 수가 없습니다. 따라서 downloaded_pages 큐가 비어있지 않을 때 까지 계속 while 루프를 돌게됩니다. 주의해야할 점은 컴퓨터 CPU의 속도에 비해 웹사이트를 다운로드하고 큐에 추가되는 속도가 매우 느리다는 점입니다. producer에서는 약 100ms마다 웹사이트 정보를 큐에 추가하는데, 이시간동안 downloaded_pages->empty() 를 수십만 번 호출할 수도 있습니다. 이는 상당한 CPU 리소스의 낭비일 수 있습니다. 

따라서, 위 코드처럼 작업 큐가 비어있다면 10ms 정도 대기했다가 다시 while 루프를 돌게됩니다.

참고로 while문 시작점에서 뮤텍스 m을 lock 해주었기 때문에, continue를 하기 전에 m을 unlock 해주어야 합니다.

 

이 후 전달받은 웹페이지 content를 처리하는 부분은 간단합니다. 큐의 front 함수를 통해서 먼저 들어온 정보를 읽고, 큐에서 제거해줍니다. 그리고 다른 쓰레드에서 바로 처리할 수 있도록 뮤텍스를 unlock 해줍니다. 웹페이지의 처리시간은 약 80ms로 가정했습니다.


condition_variable

위에서 consumer 쓰레드는 10ms마다 downloaded_pages 큐에 할 일이 있는지 확인하고 없으면 다시 기다리는 형태로 구현되어 있습니다.

 

이는 매우 비효율적인 구조합니다. 매번 언제 들어올 지 모르는 데이터를 확인하기 위해서 지속적으로 뮤텍스를 lock하고 큐를 확인해야 하기 때문입니다. 차라리 producer에서 데이터가 뜸하게 들어오는 것을 안다면 consumer는 아예 sleep 상태로 만들어놓고, producer에서 데이터가 온다면 consumer를 깨우는 방식이 더 좋아보입니다. 

(여기서는 producer/consumer 쓰레드 뿐이지만, 더 큰 프로그램에서 사용하지 않는 쓰레드를 sleep 시키면 다른 쓰레드들이 CPU를 더 효율적으로 사용할 수 있을 것입니다.)

 

C++에서는 이를 위해서 condition_variable(조건 변수)를 제공하고 있습니다.

condition_variable을 사용해서 위 예제를 다음과 같이 구현할 수 있습니다.

#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>
#include <string>
#include <condition_variable>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m,	int index, std::condition_variable* cv) {
	for (int i = 0; i < 5; i++) {
		std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
		std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";

		m->lock();
		downloaded_pages->push(content);
		m->unlock();

		cv->notify_one(); // consumer에게 content가 준비됨을 알림
	}
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m,	int* num_processed, std::condition_variable* cv) {
	while (*num_processed < 25) { // 처리되는 페이지의 총합
		std::unique_lock<std::mutex> lk(*m);

		cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

		if (*num_processed == 25) {
			lk.unlock();
			return;
		}

		// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
		std::string content = downloaded_pages->front();
		downloaded_pages->pop();

		(*num_processed)++;
		lk.unlock();

		std::cout << content;
		std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리 후 대기시간
	}
}

int main(void) {
	std::queue<std::string> downloaded_pages;
	std::mutex m;
	std::condition_variable cv;

	std::vector<std::thread> producers;
	for (int i = 0; i < 5; i++) {
		producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
	}

	int num_processed = 0;
	std::vector<std::thread> consumers;
	for (int i = 0; i < 3; i++) {
		consumers.push_back(std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
	}

	for (int i = 0; i < 5; i++)
		producers[i].join();

	// sleep 상태인 쓰레드들을 모두 깨움
	cv.notify_all();

	for (int i = 0; i < 3; i++)
		consumers[i].join();

	return 0;
}

 

먼저 뮤텍스처럼 condition_variable 을 정의합니다.

condition_variable cv;

 

그리고 consumer부터 어떻게 사용되는지 살펴보겠습니다.

while (*num_processed < 25) { // 처리되는 페이지의 총합
	std::unique_lock<std::mutex> lk(*m);

	cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

	if (*num_processed == 25) {
		lk.unlock();
		return;
	}

	// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
	std::string content = downloaded_pages->front();
	downloaded_pages->pop();

	(*num_processed)++;
	lk.unlock();

	std::cout << content;
	std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리 후 대기시간
}

cv->wait를 사용해서 어떠한 조건이 참이 될 때까지 기다리도록 하는데, 해당 조건은 wait 함수의 인자로 전달됩니다.

여기서는

cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

위와 같이 함수 인자를 전달했는데, wait 함수의 첫 번째 인자는 unique_lock 입니다.

그리고, 두 번째 파라미터는 Callable 객체를 인자로 받습니다. 여기서 사용된 [&] { return !downloaded_pages->empty() || *num_processed == 25; } 는 람다 함수로 callable 객체가 됩니다.

 

람다 함수로 전달된 조건은 downloaded_pages에 원소들이 있거나, 전체 처리된 페이지의 개수가 25개일 때, wait를 중지하도록 했습니다. 이 조건 변수는 인자로 전달받은 조건이 거짓이라면, lk를 unlock 한 뒤에 영원히 sleep 상태를 유지하게 됩니다. 이 때, 이 쓰레드는 다른 누가 깨워주기 전까지는 계속 sleep 상태를 유지합니다.

(lk을 unlock한 뒤에 sleep 상태로 빠지게 됩니다.)

 

if (*num_processed == 25) {
    lk.unlock();
    return;
}

cv->wait 함수를 탈출한 이후에 위 코드는 wait에서 탈출한 이유가 모든 페이지를 처리해서인지, downloaded_pages에 페이지가 추가되어서 인지를 알 수 없기 때문에 추가되었습니다. 만약 모든 페이지를 아직 처리하지 않았는데, sleep에서 탈출했다면 계속 작업을 수행해야하고, 그렇지 않다면 쓰레드를 종료해야 합니다.

 

다음으로 producer를 살펴보겠습니다.

for (int i = 0; i < 5; i++) {
	std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
	std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";

	m->lock();
	downloaded_pages->push(content);
	m->unlock();

	cv->notify_one(); // consumer에게 content가 준비됨을 알림
}

다른 부분은 다 동일하지만, 마지막 부분에서 cv->notify_one()이 추가되었습니다. 페이지 다운로드를 완료했다면, sleep 상태인 쓰레들 중에 하나를 깨워야하는데, 이때 notify_one 함수가 사용됩니다. 이 함수는 조건을 만족하지 않아서 sleep 상태에 빠진 쓰레들 중에 하나를 깨워서 조건을 다시 검사하게 합니다. 만약 조건이 참이라면 그 쓰레드는 계속해서 작업을 수행하게 됩니다.

 

한 가지 main 문에서 살펴봐야할 코드는 다음과 같습니다.

// sleep 상태인 쓰레드들을 모두 깨움
cv.notify_all();

for (int i = 0; i < 3; i++)
	consumers[i].join();

producer 쓰레드에서 페이지 다운로드를 모두 완료했지만, 그 시점에서 sleep 상태에 빠진 일부 consumer 쓰레드가 존재할 것입니다. 만약 다른 처리를 해주지 않는다면, consumer 쓰레드는 끝까지 join 되지 않는 상태가 발생할 수 있습니다.

따라서, producer 쓰레드들이 모두 join한 후에, notify_all() 이라는 함수를 통해서 sleep 상태에 빠진 모든 consumer 쓰레드를 깨워서 조건을 다시 검사하도록 해줍니다. 해당 시점에서 producer 쓰레드의 모든 작업이 끝났기 때문에 consumer 쓰레드들도 종료하게 될 것이고 프로그램이 종료될 것입니다.

 

 

이상 생산자(producer) - 소비자(consumer) 패턴에 대한 내용이었습니다.

'프로그래밍 > C & C++' 카테고리의 다른 글

[C++] Move Semantics  (0) 2021.08.12
[C++] 우측값 참조(rvalue reference)  (0) 2021.08.11
[C++] mutex  (0) 2021.08.07
[C++] thread  (0) 2021.08.07
[C++] 스마트 포인터(Smart Pointer) - (2)  (3) 2021.08.03

댓글