References
- 씹어먹는 C++ (https://modoocode.com/270)
Contents
- 생산자 / 소비자 패턴
Producer - Consumer 패턴 예시
생산자(Producer) - 소비자(Consumer) 패턴은 멀티 쓰레드 프로그램에서 자주 사용되는 패턴입니다.
여기서 생산자(producer)는 무언가 처리할 일을 받아오는 쓰레드를 의미합니다.
예를 들어, 웹사이트를 다운로드하여 분석하는 프로그램을 만들었다고 가정한다면, 이 경우 웹사이트를 다운로드하는 쓰레드가 생산자가 됩니다.
소비자(consumer)는 받은 일을 처리하는 쓰레드를 의미하며, 앞의 예에서 다운로드 받은 웹페이지를 분석하는 쓰레드가 소비자 역할을 하게 됩니다.
위와 같은 예시를 쓰레드로 구현해보겠습니다.
#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>
#include <string>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m, int index) {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
m->lock();
downloaded_pages->push(content);
m->unlock();
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m, int* num_processed) {
while (*num_processed < 25) { // 처리되는 페이지의 총합
m->lock();
if (downloaded_pages->empty()) { // 다운로드 페이지가 없다면 대기
m->unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 대기시간
continue;
}
// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
m->unlock();
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리 후 대기시간
}
}
int main(void) {
std::queue<std::string> downloaded_pages;
std::mutex m;
std::vector<std::thread> producers;
for (int i = 0; i < 5; i++) {
producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; i++) {
consumers.push_back(std::thread(consumer, &downloaded_pages, &m, &num_processed));
}
for (int i = 0; i < 5; i++)
producers[i].join();
for (int i = 0; i < 3; i++)
consumers[i].join();
return 0;
}
위 코드를 실행시키면 다음의 결과를 확인할 수 있습니다.
먼저 producer 쓰레드에서는 웹페이지를 계속 다운로드하는 역할을 담당합니다. 이 때, 다운로드한 페이지는 downloaded_pages 라는 큐에 저장되며, 큐를 사용하는 이유는 FIFO 특성을 이용하기 위해서 입니다.
즉, 먼저 다운로드한 페이지를 먼저 처리하기 위해서 입니다.
producer는 실제 다운로드하는 코드는 아니고, 해당 상황을 시뮬레이션하였습니다.
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
m->lock();
downloaded_pages->push(content);
m->unlock();
}
한 웹사이트당 5개의 페이지를 다운받고 있으며, std::this_thread_sleep_for을 통해서 다운로드 시간을 sleep 시켰다가, 다운로드 받은 내용을 content에 저장합니다.
그리고 다운받은 페이지를 작업 큐에 삽입해야하는데, 이때 주의할 점은 producer 쓰레드가 총 5개이기 때문에 downloaded_pages 큐에 접근하는 쓰레드들 사이에서 race condition이 발생할 수 있다는 것입니다.
이를 방지하기 위해서 뮤텍스 m으로 해당 코드를 critical section으로 감싸서 문제가 발생하지 않도록 합니다.
다음은 consumer 입니다.
while (*num_processed < 25) { // 처리되는 페이지의 총합
m->lock();
if (downloaded_pages->empty()) { // 다운로드 페이지가 없다면 대기
m->unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 대기시간
continue;
}
// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
m->unlock();
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리시간
}
consumer 쓰레드 입장에서는 하려고 하는 작업이 언제 들어올 지 알 수가 없습니다. 따라서 downloaded_pages 큐가 비어있지 않을 때 까지 계속 while 루프를 돌게됩니다. 주의해야할 점은 컴퓨터 CPU의 속도에 비해 웹사이트를 다운로드하고 큐에 추가되는 속도가 매우 느리다는 점입니다. producer에서는 약 100ms마다 웹사이트 정보를 큐에 추가하는데, 이시간동안 downloaded_pages->empty() 를 수십만 번 호출할 수도 있습니다. 이는 상당한 CPU 리소스의 낭비일 수 있습니다.
따라서, 위 코드처럼 작업 큐가 비어있다면 10ms 정도 대기했다가 다시 while 루프를 돌게됩니다.
참고로 while문 시작점에서 뮤텍스 m을 lock 해주었기 때문에, continue를 하기 전에 m을 unlock 해주어야 합니다.
이 후 전달받은 웹페이지 content를 처리하는 부분은 간단합니다. 큐의 front 함수를 통해서 먼저 들어온 정보를 읽고, 큐에서 제거해줍니다. 그리고 다른 쓰레드에서 바로 처리할 수 있도록 뮤텍스를 unlock 해줍니다. 웹페이지의 처리시간은 약 80ms로 가정했습니다.
condition_variable
위에서 consumer 쓰레드는 10ms마다 downloaded_pages 큐에 할 일이 있는지 확인하고 없으면 다시 기다리는 형태로 구현되어 있습니다.
이는 매우 비효율적인 구조합니다. 매번 언제 들어올 지 모르는 데이터를 확인하기 위해서 지속적으로 뮤텍스를 lock하고 큐를 확인해야 하기 때문입니다. 차라리 producer에서 데이터가 뜸하게 들어오는 것을 안다면 consumer는 아예 sleep 상태로 만들어놓고, producer에서 데이터가 온다면 consumer를 깨우는 방식이 더 좋아보입니다.
(여기서는 producer/consumer 쓰레드 뿐이지만, 더 큰 프로그램에서 사용하지 않는 쓰레드를 sleep 시키면 다른 쓰레드들이 CPU를 더 효율적으로 사용할 수 있을 것입니다.)
C++에서는 이를 위해서 condition_variable(조건 변수)를 제공하고 있습니다.
condition_variable을 사용해서 위 예제를 다음과 같이 구현할 수 있습니다.
#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>
#include <queue>
#include <vector>
#include <string>
#include <condition_variable>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m, int index, std::condition_variable* cv) {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
m->lock();
downloaded_pages->push(content);
m->unlock();
cv->notify_one(); // consumer에게 content가 준비됨을 알림
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m, int* num_processed, std::condition_variable* cv) {
while (*num_processed < 25) { // 처리되는 페이지의 총합
std::unique_lock<std::mutex> lk(*m);
cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
if (*num_processed == 25) {
lk.unlock();
return;
}
// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
lk.unlock();
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리 후 대기시간
}
}
int main(void) {
std::queue<std::string> downloaded_pages;
std::mutex m;
std::condition_variable cv;
std::vector<std::thread> producers;
for (int i = 0; i < 5; i++) {
producers.push_back(std::thread(producer, &downloaded_pages, &m, i + 1, &cv));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; i++) {
consumers.push_back(std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
}
for (int i = 0; i < 5; i++)
producers[i].join();
// sleep 상태인 쓰레드들을 모두 깨움
cv.notify_all();
for (int i = 0; i < 3; i++)
consumers[i].join();
return 0;
}
먼저 뮤텍스처럼 condition_variable 을 정의합니다.
condition_variable cv;
그리고 consumer부터 어떻게 사용되는지 살펴보겠습니다.
while (*num_processed < 25) { // 처리되는 페이지의 총합
std::unique_lock<std::mutex> lk(*m);
cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
if (*num_processed == 25) {
lk.unlock();
return;
}
// 생산자에서 전달받은 다운로드된 웹페이지를 읽고 큐에서 제거
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
lk.unlock();
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80)); // 처리 후 대기시간
}
cv->wait를 사용해서 어떠한 조건이 참이 될 때까지 기다리도록 하는데, 해당 조건은 wait 함수의 인자로 전달됩니다.
여기서는
cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
위와 같이 함수 인자를 전달했는데, wait 함수의 첫 번째 인자는 unique_lock 입니다.
그리고, 두 번째 파라미터는 Callable 객체를 인자로 받습니다. 여기서 사용된 [&] { return !downloaded_pages->empty() || *num_processed == 25; } 는 람다 함수로 callable 객체가 됩니다.
람다 함수로 전달된 조건은 downloaded_pages에 원소들이 있거나, 전체 처리된 페이지의 개수가 25개일 때, wait를 중지하도록 했습니다. 이 조건 변수는 인자로 전달받은 조건이 거짓이라면, lk를 unlock 한 뒤에 영원히 sleep 상태를 유지하게 됩니다. 이 때, 이 쓰레드는 다른 누가 깨워주기 전까지는 계속 sleep 상태를 유지합니다.
(lk을 unlock한 뒤에 sleep 상태로 빠지게 됩니다.)
if (*num_processed == 25) {
lk.unlock();
return;
}
cv->wait 함수를 탈출한 이후에 위 코드는 wait에서 탈출한 이유가 모든 페이지를 처리해서인지, downloaded_pages에 페이지가 추가되어서 인지를 알 수 없기 때문에 추가되었습니다. 만약 모든 페이지를 아직 처리하지 않았는데, sleep에서 탈출했다면 계속 작업을 수행해야하고, 그렇지 않다면 쓰레드를 종료해야 합니다.
다음으로 producer를 살펴보겠습니다.
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index)); // 다운로드 시간
std::string content = "웹사이트 : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
m->lock();
downloaded_pages->push(content);
m->unlock();
cv->notify_one(); // consumer에게 content가 준비됨을 알림
}
다른 부분은 다 동일하지만, 마지막 부분에서 cv->notify_one()이 추가되었습니다. 페이지 다운로드를 완료했다면, sleep 상태인 쓰레들 중에 하나를 깨워야하는데, 이때 notify_one 함수가 사용됩니다. 이 함수는 조건을 만족하지 않아서 sleep 상태에 빠진 쓰레들 중에 하나를 깨워서 조건을 다시 검사하게 합니다. 만약 조건이 참이라면 그 쓰레드는 계속해서 작업을 수행하게 됩니다.
한 가지 main 문에서 살펴봐야할 코드는 다음과 같습니다.
// sleep 상태인 쓰레드들을 모두 깨움
cv.notify_all();
for (int i = 0; i < 3; i++)
consumers[i].join();
producer 쓰레드에서 페이지 다운로드를 모두 완료했지만, 그 시점에서 sleep 상태에 빠진 일부 consumer 쓰레드가 존재할 것입니다. 만약 다른 처리를 해주지 않는다면, consumer 쓰레드는 끝까지 join 되지 않는 상태가 발생할 수 있습니다.
따라서, producer 쓰레드들이 모두 join한 후에, notify_all() 이라는 함수를 통해서 sleep 상태에 빠진 모든 consumer 쓰레드를 깨워서 조건을 다시 검사하도록 해줍니다. 해당 시점에서 producer 쓰레드의 모든 작업이 끝났기 때문에 consumer 쓰레드들도 종료하게 될 것이고 프로그램이 종료될 것입니다.
이상 생산자(producer) - 소비자(consumer) 패턴에 대한 내용이었습니다.
'프로그래밍 > C & C++' 카테고리의 다른 글
[C++] Move Semantics (0) | 2021.08.12 |
---|---|
[C++] 우측값 참조(rvalue reference) (0) | 2021.08.11 |
[C++] mutex (0) | 2021.08.07 |
[C++] thread (0) | 2021.08.07 |
[C++] 스마트 포인터(Smart Pointer) - (2) (3) | 2021.08.03 |
댓글