본문 바로가기
프로그래밍/C & C++

[C++] 멀티스레딩 프로그래밍 (1)

by 별준 2022. 3. 3.

References

Contents

  • 멀티스레드 프로그래밍 개념
  • Thread
  • Atomic Operations Library

이번 포스팅에서는 C++의 멀티스레딩 프로그래밍에 대해서 알아보려고 합니다.

[C++] thread

[C++] mutex

[C++] 생산자(Producer) / 소비자(Consumer) 패턴

[C++] 비동기(Asynchronous) 실행

예전에 위의 포스팅들을 통해서 살펴봤었는데, 이번 포스팅을 통해서 전체적으로 정리해보려고 합니다.

 

멀티스레딩 프로그래밍(multithreaded programming)은 프로세서 유닛이 여러 개 장착된 컴퓨터 시스템에서 중요한 기법이며, 이를 이용하여 시스템에 있는 여러 프로세서 유닛을 병렬로 사용하는 프로그램을 작성할 수 있습니다.

이처럼 프로세서의 기능을 최대한 활용할 수 있도록 멀티스레드 코드를 정확하게 작성할 줄 알아야 하는데, 멀티스레드 어플리케이션은 플랫폼이나 OS에서 제공하는 API에 상당히 의존합니다. 그래서 멀티스레드 코드를 플랫폼 독립적으로 작성하기는 힘듭니다. C++11부터 제공되는 표준 스레딩 라이브러리를 활용하면 이를 어느정도 해결할 수 있습니다.

 

멀티스레드 프로그래밍을 플랫폼 독립적으로 할 수 있게 해주는 서드파티 라이브러리(ex, pthreads와 boost::thread)도 있습니다만, 이 라이브러리는 C++에 속하지는 않습니다.

 


1. 멀티스레드 프로그래밍 개념

멀티스레드 프로그래밍을 사용하면 여러 연산을 병렬로 처리할 수 있습니다. 그래서 현재는 거의 모든 시스템에 장착된 멀티 프로세서를 최대한 활용할 수 있습니다.

 

C++98/03 버전은 멀티스레드 프로그래밍을 지원하지 않아서 서드파티 라이브러리나 타겟 시스템의 OS에서 제공하는 멀티스레드 API를 활용하는 수밖에 없었습니다. C++11부터 표준 멀티 스레드 라이브러리가 추가되면서 크로스 플랫폼 멀티스레드 프로그램을 작성하기 한결 쉬워졌습니다. 현재 C++ 표준은 GPU를 제외한 CPU만을 대상으로 API를 정의하고 있지만, 향후 GPU도 지원하도록 개선되지 않을까 가능성을 열어두고 있습니다.

 

멀티스레드 프로그래밍이 필요한 이유는 크게 두 가지가 있습니다. 첫째, 주어진 연산 작업을 작은 문제들로 나눠서 각각을 멀티프로세서 시스템에서 병렬로 실행하면 전반적인 성능을 크게 높일 수 있습니다.

둘째, 연산을 다른 관점에서 모듈화할 수 있습니다. 예를 들어 연산을 UI 스레드에 종속적이지 않은 독립 스레드로 분리해서 구현하면 처리 시간이 긴 연산을 백그라운드로 실행시키는 방식으로 UI의 응답 속도를 높일 수 있습니다.

 

다음 그림은 병렬 처리가 절대적으로 유리한 상황을 보여주고 있습니다.

싱글 코어 프로세서에서는 모든 부분을 순차적으로 실행해야 하고, 듀얼 코어 프로세서에서는 두 부분씩 동시에 실행할 수 있고, 쿼드 코어 프로세서에서는 각 부분을 동시에 실행할 수 있습니다. 이처럼 성능이 코어 수에 정비례합니다.

 

물론 항상 이렇게 독립 작업으로 나눠서 병렬화할 수 있는 것은 아닙니다. 그래도 최소한 일부분만이라도 병렬화할 수 있다면 조금이라도 성능을 높일 수 있습니다. 멀티스레드 프로그래밍을 하는 데 어려운 부분은 병렬 알고리즘을 고안하는 것입니다. 처리할 작업의 성격에 따라 구현 방식이 크게 달라지기 때문입니다. 또한 경쟁 상태(race condition), 교착 상태(deadlocks), 테어링(tearing), 거짓 공유(false-sharing) 등과 같은 문제가 발생하지 않게 만드는 것도 쉽지 않습니다.

이어지는 내용에서 이러한 문제점에 대해서 간단하게 살펴보겠습니다. 이런 문제는 주로 아토믹과 명시적인 동기화 메커니즘으로 해결하는데, 문제점에 대한 해결 방법도 뒤에서 살펴보도록 하겠습니다.

 

1.1 Race Conditions

여러 스레드가 공유 리소스를 동시에 접근할 때 경쟁 상태(race condition)가 발생할 수 있습니다. 그중에서도 공유 메모리에 대한 경쟁 상태를 흔히 데이터 경쟁(data races)이라고 부릅니다. 데이터 경쟁은 여러 스레드가 공유 메모리에 동시에 접근할 수 있는 상태에서 최소 하나의 스레드가 그 메모리에 데이터를 쓸 때 발생합니다.

 

예를 들어 공유 변수가 하나 있는데 어떤 스레드는 이 값을 증가시키고, 또 어떤 스레드는 이 값을 감소시키는 경우를 생각해봅시다. 값을 증가시키거나 감소하려면 현재 값을 메모리에 읽어서 증가나 감소 연산을 수행해야 합니다.

PDP-11이나 VAX와 같은 예전 아키텍처는 아토믹(atomic)하게 실행되는 INC와 같은 인스트럭션을 제공했습니다. 하지만 최신 x86 프로세서에서 제공하는 INC는 더 이상 아토믹하지 않습니다. 다시 말해 INC를 처리하는 도중에 다른 인스트럭션이 실행될 수 있기 때문에 결과가 얼마든지 달라질 수 있습니다.

 

다음 표는 초기값이 1일 때 감소 연산이 실행되기 전에 증가 연산을 마치는 경우를 보여줍니다.

위의 경우 메모리에 기록되는 최종 결과는 1입니다.

이와 반대로 다음 표와 같이 증가 연산을 수행하는 스레드가 시작하기 전에 감소연산을 수행하는 스레드가 작업을 모두 마쳐서 최종 결과는 1이 됩니다.

하지만 두 작업이 다음 표와 같이 서로 엇갈리게 되면 결과가 달라집니다.

이렇게 되면 최종 결과는 0이 됩니다. 다시 말해 증가 연산의 효과가 사라지는 것입니다. 이러한 현상을 데이터 경쟁이라고 부릅니다.

 

1.2 Tearing

테어링(tearing)이란 데이터 경쟁의 특수한 경우로서, 크게 torn read와 torn write의 두 가지가 있습니다. 어떤 스레드가 메모리에 데이터의 일부만 쓰고 나머지 부분을 미처 쓰지 못한 상태에서 다른 스레드가 이 데이터를 읽으면 두 스레드가 보는 값이 달라집니다. 이를 torn read라고 합니다. 또한 두 스레드가 이 데이터에 동시에 쓸 때 한 스레드는 그 데이터의 한쪽 부분을 쓰고, 다른 스레드 는그 데이터의 다른 부분을 썼다면 각자 수행한 결과가 달라지는데, 이를 torn write라고 합니다.

 

1.3 Deadlocks

경쟁 상태를 막기 위해 상호 배재(mutual exlusion)와 같은 동기화 기법을 적용하다 보면 멀티스레드 프로그래밍에서 흔히 발생하는 또 다른 문제인 데드락(deadlocks, 교착상태)에 부딪히기 쉽습니다. 데드락이란 여러 스레드가 서로 상대방 작업이 끝날 때까지 동시에 기다리는 상태를 말합니다.

예를 들어 두 스레드가 공유 리소스를 서로 접근하려면 먼저 그 리소스에 대한 접근 권한 요청부터 해야합니다. 현재 둘 중 한 스레드가 그 리소스에 대한 접근 권한을 확보한 상태로 계속 머물러 있으면 그 리소스에 대한 접근 권한을 요청하는 다른 스레드도 무한히 기다려야 합니다.

 

이때 공유 리소스에 대한 접근 권한을 얻는 방법에는 뮤텍스라는 것이 있습니다 (뒤에서 뮤텍스에 관해서 설명하도록 하겠습니다). 예를 들어 스레드가 두 개 있고 리소스도 두 개 있을 때 이를 A와 B라는 뮤텍스 객체로 보호하고 있다고 해봅시다. 이때 두 스레드가 각 리소스에 대한 접근 권한을 얻을 수 있지만, 그 순서는 다음 표와 같이 서로 다른 경우들에 대해 살펴보도록 하겠습니다.

이 스레드가 실행되면 다음의 순서로 진행될 수 있습니다.

  • Thread 1: A 확보
  • Thread 2: B 확보
  • Thread 1: B 확보 (Thread 2가 B를 확보하고 있기 때문에 대기)
  • Thread 2: A 확보 (Thread 1이 A를 확보하고 있기 때문에 대기)

이렇게 되면 두 스레드 모두 상대방을 무한정 기다리는 데드락이 발생합니다. 이러한 드데락 상황을 그림으로 표현하면 아래와 같습니다.

스레드 1은 A 리소스에 대한 접근 권한을 확보한 상태에서 B 리소스의 접근 권한을 얻을 때까지 기다리고, 스레드 2는 B 리소스의 접근 권한을 확보한 상태에서 A 리소스의 접근 권한을 얻을 때까지 기다립니다. 위 그림을 보면 데드락 상황이 순환 관계를 이루고 있으며, 결국 두 스레드는 서로를 무한정 기다리게 됩니다.

 

이러한 데드락이 발생하지 않게 하려면 모든 스레드가 일정한 순서로 리소스를 획득해야 합니다. 또한 데드락이 발생해도 빠져나올 수 있는 메커니즘을 함께 구현하면 좋습니다. 한 가지 방법은 리소스 접근 권한을 요청하는 작업에 시간제한을 걸어두는 것입니다. 그래서 주어진 시간 안에 리소스를 확보할 수 없으면 더 이상 기다리지 않고 현재 확보한 권한을 해제합니다. 이렇게 하면 다른 스레드가 리소스에 접근할 기회를 줄 수 있습니다. 물론 이 기법만으로 문제를 해결할 수 있는지는 주어진 데드락 상황에 따라 다릅니다.

 

방금 언급한 우회 기법으로 해결하는 것보다는 데드락 상황 자체를 아예 발생하지 않도록 하는 것이 좋은데, 여러 뮤텍스 객체로 보호받고 있는 리소스 집합에 대해 접근 권한을 얻을 때는 리소스마다 접근 권한을 개별적으로 요청하지 않고 std::lock()이나 std::try_lock()과 같은 함수를 사용하는 것이 좋습니다. 이 함수들은 여러 리소스에 대한 권한을 한 번에 확보하거나 요청합니다.

 

1.4 False-Sharing

대부분 캐시(cache)는 캐시 라인(cache line) 단위로 처리됩니다. 최신 CPU는 흔히 64바이트 캐시 라인으로 구성됩니다. 캐시 라인에 데이터를 쓰려면 반드시 그 라인 전체에 락을 걸어야 합니다. 멀티 스레드 코드를 실행할 때 데이터 구조를 잘 만들지 않으면 캐시 라인에 락을 거는 과정에서 성능이 크게 떨어질 수 있습니다.

 

예를 들어, 두 스레드가 두 가지 데이터 영역을 사용하는데, 데이터가 같은 캐시 라인에 걸쳐있는 경우를 생각해봅시다. 이때 한 스레드가 데이터를 업데이트하면 캐시 라인 전체에 락을 걸어버리기 때문에 다른 스레드는 기다려야 합니다. 아래 그림은 두 스레드가 다른 메모리 블럭을 쓰지만 같은 캐시 라인을 공유하는 모습을 보여줍니다.

캐시 라인에 걸쳐 있지 않도록 데이터 구조가 저장될 메모리 영역을 명시적으로 정렬하면 여러 스레드가 접근할 때 대기하지 않게 만들 수 있습니다. 이러한 코드를 이식하기 좋게 작성할 수 있도록 C++17부터 <new> 헤더 파일에 hardware_destructive_interference_size란 상수가 추가되었습니다. 이 상수는 동시에 접근하는 두 객체가 캐시 라인을 공유하지 않도록 최소한의 오프셋을 제시해줍니다. 이 값과 alignas 키워드를 사용하여 데이터를 적절히 정렬할 수 있습니다.

 


2. Threads

<thread> 헤더 파일에 정의된 C++ 스레드 라이브러리를 사용하면 스레드를 매우 간편하게 생성할 수 있습니다. 이때 새로 만든 스레드가 할 일을 지정하는 방식은 다양합니다. 전역 함수로 표현하거나, 함수 객체의 operator()로 표현하거나 람다 표현식으로 지정하거나 특정 클래스의 인스턴스에 있는 멤버 함수로 지정할 수도 있습니다. 각 방법을 하나씩 살펴보겠습니다.

 

2.1 Threads with Function Pointer

윈도우 시스템의 CreateThread(), _beginthread()와 같은 함수나 pthreads 라이브러리의 pthread_create()와 같은 스레드 함수는 매개변수를 하나만 받습니다. 반면 C++ 표준에서 제공하는 std::thread 클래스에서 사용하는 함수는 매개변수를 원하는 개수만큼 받을 수 있습니다.

 

예를 들어 다음과 같이 정수 두 개를 인수로 받는 counter() 함수를 살펴보겠습니다. 첫 번째 인수는 ID를 표현하고, 두 번째 인수는 이 함수가 루프를 도는 횟수를 표현합니다. 이 함수는 인수로 지정한 횟수만큼 표준 출력에 메세지를 보내는 문장을 반복합니다.

void counter(int id, int numIterations)
{
    for (int i = 0; i < numIterations; i++) {
        std::cout << "Counter: " << id << " has value " << i << std::endl;
    }
}

std::thread를 이용하면 이 함수를 여러 스레드로 실행하게 만들 수 있습니다. 예를 들어 인수로 1과 6을 지정해서 counter()를 수행하는 스레드인 t1을 다음과 같이 생성할 수 있습니다.

std::thread t1(counter, 1, 6);

thread 클래스 생성자는 가변인수 템플릿이기 때문에 인수 개수를 원하는 만큼 지정할 수 있습니다. 첫 번째 인수는 새로 만들 스레드가 실행할 함수의 이름입니다. 그 뒤에 나오는 인수는 스레드가 구동되면서 실행할 함수에 전달할 인수들입니다.

 

현재 시스템에서 thread 객체가 active 스레드로 표현될 때 이 thread 객체를 joinable하다고 표현합니다. 이런 스레드는 실행을 마치고 나서도, thread 객체가 joinable 상태를 유지합니다. 디폴트로 생성된 thread 객체는 unjoinable합니다. joinable한 thread 객체를 제거하려면, 먼저 그 객체의 join()이나 detach() 메소드를 호출해야 합니다. join()을 호출하는 것은 blocking call이며, join()을 호출한 스레드는 해당 thread 객체가 작업을 끝날 때까지 기다립니다. detach()를 호출하면 thread 객체를 OS 내부의 스레드와 분리합니다. 따라서 OS 스레드는 이 객체와 독립적으로 실행됩니다. 두 메소드는 모두 스레드를 unjoinable한 상태로 전환시킵니다. joinable한 상태의 thread 객체를 제거하면 그 객체의 소멸자는 std::terminate()를 호출해서 모든 스레드뿐만 아니라 어플리케이션도 종료시킵니다.

 

다음 코드는 counter() 함수를 실행하는 스레드를 2개 생성합니다. main()에서 스레드를 생성하고 나서 곧바로 두 스레드에 대해 join()을 호출합니다.

int main()
{
    std::thread t1(counter, 1, 6);
    std::thread t2(counter, 2, 4);

    t1.join();
    t2.join();
}

이 코드를 실행해보면, 아래와 같이 조금 이상하게 출력됩니다.

코드를 실행하는 시스템마다 결과가 달라질 수 있고, 같은 시스템에서도 실행할 때마다 결과가 달라질 수 있습니다. 두 스레드가 counter() 함수를 동시에 실행하므로 시스템에 장착된 코어 수와 OS의 스레드 스케줄링 방식에 따라 출력 형태가 달라집니다.

 

기본적으로 cout에 접근하는 작업은 thread-safe하기 때문에 여러 스레드 사이에서 데이터 경쟁이 발생하지 않습니다 (출력이나 입력 연산 직전에 cout.sync_with_stdio(false)를 호출하지 않았을 경우). 하지만 데이터 경쟁이 발생하지 않더라도 스레드마다 출력한 결과는 여전히 겹쳐질 수 있습니다.

 

동기화 기법을 적용하면 뒤섞이지 않게 만들 수 있는데, 이에 대해서는 뒤에서 살펴보겠습니다.

 

2.2 Thread with Function Object

이번에는 함수 객체로 스레드를 실행시키는 방법을 알아보도록 하겠습니다. 방금 소개한 방법대로 함수 포인터로 스레드를 만들면 함수에 인수를 전달하는 방식으로만 스레드에 정보를 전달할 수 있습니다. 반면 함수 객체로 만들면 그 함수 객체의 클래스에 멤버 변수를 추가해서 원하는 방식으로 초기화해서 사용할 수 있습니다.

 

다음 예제 코드는 먼저 Counter라는 클래스를 정의합니다. 이 클래스는 ID와 반복 횟수를 표현하는 멤버 변수를 가지고 있습니다. 두 변수 모두 생성자로 초기화됩니다. Counter 클래스를 함수 객체로 만들기 위해 operator()를 구현하였습니다.

class Counter
{
public:
    Counter(int id, int numIterations)
        : m_id{ id }, m_numIterations{ numIterations } {}

    void operator()() const {
        for (int i = 0; i < m_numIterations; i++) {
            std::cout << "counter " << m_id << " has value " << i << std::endl;
        }
    }
private:
    int m_id;
    int m_numIterations;
};

다음 코드는 함수 객체를 만든 스레드를 초기화하는 2가지 방법을 보여줍니다.

첫 번째 방법은 유니폼 초기화(uniform initialization) 문법을 사용합니다. Counter 생성자에 인수를 지정해서 인스턴스를 생성하면 그 값이 중괄호로 묶인 thread 생성자 인수로 전달됩니다.

두 번째 방법은 Counter 인스턴스를 일반 변수처럼 이름있는 인스턴스로 정의하고, 이를 thread 클래스의 생성자로 전달합니다.

int main()
{
    // Using uniform initialization syntax
    std::thread t1{ Counter{1, 20} };

    // Using named variable.
    Counter c{ 2, 12 };
    std::thread t2{ c };

    // Wait for threads to finish.
    t1.join();
    t2.join();
}
함수 객체는 항상 스레드의 내부 저장소에 복사됩니다. 함수 객체의 인스턴스를 복사하지 않고 그 인스턴스에 대해 operator()를 호출하려면 <functional> 헤더에 정의된 std::ref()나 cref()를 사용해서 인스턴스를 레퍼런스로 전달해야 합니다.
예를 들면, 다음과 같습니다.
Counter c{ 2, 12 };
std::thread t2{ std::ref(c) };​

 

2.3 Thread with Lambda

람다 표현식은 표현 C++ 스레드 라이브러리에 적합합니다. 예를 들어 람다 표현식으로 정의한 작업을 실행하는 스레드를 생성하는 코드를 다음과 같이 작성할 수 있습니다.

int main()
{
    int id{ 1 };
    int numIterations{ 5 };
    std::thread t1{ [id, numIterations] {
        for (int i = 0; i < numIterations; ++i) {
            std::cout << "Counter " << id << " has value " << i << std::endl;
        }
    } };
    t1.join();
}

 

2.4 Thread Local Storage

스레드에서 실행할 내용을 클래스의 멤버 함수로 지정할 수도 있습니다. 다음 코드는 기본 Request 클래스에 process() 메소드를 정의하고 있습니다. 그리고 main() 함수에서 Request 클래스의 인스턴스를 생성하고, Request 인스턴스인 req의 process() 메소드를 실행하는 스레드를 생성합니다.

class Request
{
public:
    Request(int id) : m_id{ id } {}
    void process() { std::cout << "Processing request: " << m_id << std::endl; }
private:
    int m_id;
};

int main()
{
    Request req{ 100 };
    std::thread t{ &Request::process, &req };
    t.join();
}

이렇게 하면 특정한 객체에 있는 메소드를 스레드로 분리해서 실행할 수 있습니다. 똑같은 객체를 여러 스레드가 접근할 때 데이터 경쟁이 발생하지 않도록 스레드에 안전하게 작성해야 합니다. 스레드에 안전하게 구현하는 방법 중 하나는 이어지는 내용에서 설명할 뮤텍스라는 동기화 기법을 활용하는 것입니다.

 

2.5 Thread Local Storage

C++ 표준은 스레드 로컬 저장소(thread local storage)란 개념을 제공합니다. 스레드 로컬로 지정하고자 하는 변수에 thread_local이란 키워드를 지정하면, 각 스레드마다 이 변수를 복사해서 스레드가 없어질 때까지 유지합니다. 이 변수는 각 스레드에서 한 번만 초기화됩니다.

예를 들어 다음 코드에는 두 개의 전역 변수가 정의되어 있는데, 모든 스레드가 하나의 k 복사본을 공유하며, 각 스레드는 자신의 고유한 n의 복사본을 가집니다.

int k;
thread_local int n;

 

다음 코드는 위 설명에 대한 내용을 보여주고 있습니다. threadFunction()은 k와 n의 현재 값을 콘솔에 출력하고 둘 다 1씩 증가시킵니다. main() 함수는 첫 번째 스레드를 실행하고, 이 스레드가 종료될 때까지 기다렸다가 두 번째 스레드를 실행시킵니다.

int k;
thread_local int n;

void threadFunction(int id)
{
    std::cout << "Thread " << id << ": k=" << k << ", n=" << n << std::endl;
    ++n;
    ++k;
}

int main()
{
    std::thread t1{ threadFunction, 1 }; t1.join();
    std::thread t2{ threadFunction, 2 }; t2.join();
}

위 코드를 실행한 결과는 다음과 같습니다.

여기서 모든 스레드가 하나의 k 인스턴스를 공유하고, 각 스레드들은 각자의 n을 가지고 있는 것을 확인할 수 있습니다.

 

만약 thread_local 변수를 함수 스코프 내에서 선언하면 모든 스레드가 복사본을 따로 갖고 있고, 함수를 아무리 많이 호출하더라도 스레드마다 단 한 번만 초기화된다는 점을 제외하면 static으로 선언할 때와 동일하게 동작합니다.

 

2.6 Canceling Threads

C++ 표준은 현재 실행 중인 스레드를 다른 스레드에서 중단시키는 메커니즘을 포함하지는 않습니다. 이렇게 다른 스레드를 종료시키기 위한 가장 좋은 방법은 여러 스레드가 공통으로 따르는 통신 메커니즘을 따르는 것입니다. 가장 간단한 방법은 공유 변수를 활용하는 것인데, 값을 전달받은 스레드는 이 값을 주기적을 확인하면서 중단 여부를 결정합니다. 나머지 스레드는 이러한 공유 변수를 이용해 이 스레드를 간접적으로 종료시킬 수 있습니다. 하지만 이때 조심해야 할 점이 있는데, 여러 스레드가 공유 변수에 접근하기 때문에 하나 이상의 스레드가 그 변수에 값을 쓸 수 있습니다. 따라서 이 변수를 아토믹(atomic)이나 조건 변수(condition variables)로 만드는 것이 좋습니다. 이에 대한 내용은 아래에서 다루도록 하겠습니다.

 

또 다른 방법으로 C++20부터 제공되는 jthread 클래스를 사용하는 것이 있는데, 아직 자세히 살펴보진 못해서 이번 포스팅에서는 다루지 않겠습니다 !

 

2.7 Retrieving Results from Threads

지금까지 살펴본 예제들에서 볼 수 있듯이 스레드를 새로 만드는 것은 어렵지 않습니다. 하지만 정작 중요한 부분은 스레드로 처리한 결과입니다. 예를 들어 스레드로 수학 연산을 수행하면 모든 스레드가 종료한 뒤에 나오는 최종 결과를 구해야 합니다. 한 가지 방법은 결과를 담은 변수에 대한 포인터나 레퍼런스를 스레드로 전달해서 스레드마다 결과를 저장하도록 만드는 것입니다. 또 다른 방법은 함수 객체의 클래스 멤버 변수에 처리 결과를 저장했다가 나중에 스레드가 종료할 때 그 값을 가져오는 것입니다. 이렇게 하려면 반드시 std::ref()를 이용해서 함수 객체의 레퍼런스를 thread 생성자에 전달해야 합니다.

 

그런데, 이보다 더 쉬운 방법이 있긴 합니다. 바로 future를 활용하는 것인데, 이를 사용하면 스레드 안에서 발생한 에러를 처리하기도 쉽습니다. 이에 대한 내용은 포스팅 마지막 부분에서 자세히 다루어보도록 하겠습니다.

 

2.8 Copying and Rethrowing Exceptions

스레드가 하나만 있을 때는 C++ 익셉션 메커니즘 관련 문제가 발생할 일이 없습니다. 그런데 스레드에서 던진 익셉션은 그 스레드 안에서 처리해야 합니다. 던진 익셉션을 스레드 안에서 잡지 못하면 C++ 런타임은 std::terminate()를 호출해서 어플리케이션 전체를 종료시킵니다. 한 스레드에서 던진 익셉션을 다른 스레드에서 잡을 수는 없습니다. 그래서 멀티스레드 환경에서 익셉션을 처리하는 과정에 여러 가지 문제가 발생합니다.

 

표준 스레드 라이브러리를 사용하지 않고도 스레드 사이에 발생한 익셉션을 처리할 수는 있지만 굉장히 힘듭니다. 이를 위해 표준 스레드 라이브러리는 다음과 같은 익셉션 관련 함수를 제공합니다. 이 함수는 std::exception뿐만 아니라 int, string, 커스텀 익셉션 등에도 적용됩니다.

  • exception_ptr current_exception() noexcept;
    이 함수는 catch 블록에서 호출하며, 현재 처리할 익셉션을 가리키는 exception_ptr 객체나 그 복사본을 리턴합니다. 현재 처리하는 익셉션이 없으면 null exception_ptr 객체를 리턴합니다. 이때 참조하는 익셉션 객체는 exception_ptr 타입의 객체가 존재하는 한 유효합니다. exception_ptr의 타입은 NullablePointer이기 때문에 간단히 if문을 작성해서 테스트하기가 쉽습니다. 이에 관련한 예제는 아래에서 살펴보겠습니다.

  • [[noreturn]] void rethrow_exception(exception_ptr p);
    이 함수는 exception_ptr 매개변수가 참조하는 익셉션을 다시 던집니다. 참조한 익셉션을 반드시 그 익셉션이 처음 발생한 스레드 안에서만 다시 던져야 한다는 법은 없습니다. 그래서 여러 스레드에서 발생한 익셉션을 처리하는 용도로 딱 맞습니다. [[noreturn]] 속성은 이 함수가 정상적으로 리턴하지 않는다는 것을 선언합니다.

  • template<class E> exception_ptr make_exception_ptr(E e) noexcept;
    이 함수는 주어진 익셉션 객체의 복사본을 참조하는 exception_ptr 객체를 생성합니다. 실질적으로 다음 코드의 축약 표현입니다.
try {
    throw e;
}
catch (...) {
    return current_exception();
}

 

이러한 함수로 스레드에서 발생한 익셉션을 처리하는 방법을 살펴보겠습니다.

다음 코드는 일정한 작업을 수행한 뒤 익셉션을 던지는 함수를 정의합니다. 이 함수는 별도 스레드로 실행합니다.

void doSomeWork()
{
    for (int i = 0; i < 5; i++) {
        std::cout << i << std::endl;
    }
    std::cout << "Thread throwing a runtime_error exception...\n";
    throw std::runtime_error{ "Exception from thread" };
}

 

다음 threadFunc() 함수는 doSomeWork()가 던진 익셉션을 모두 받도록 try/catch 블록으로 묶습니다. threadFunc()은 exception_ptr& 타입 인수 하나만 받습니다. 익셉션을 잡았다면 current_exception() 함수를 이용하여 처리할 익셉션에 대한 레퍼런스를 받아서 exception_ptr 매개변수에 대입합니다. 그런 다음 스레드는 정상적으로 종료합니다.

void threadFunc(std::exception_ptr& err)
{
    try {
        doSomeWork();
    }
    catch (...) {
        std::cout << "Thread caught exception, returning exception...\n";
        err = std::current_exception();
    }
}

 

아래의 doWorkInThread() 함수는 메인 스레드에서 호출됩니다. 이 함수는 스레드를 생성해서 그 안에 담긴 threadFunc() 의 실행을 시작하는 역할을 담당합니다. threadFunc()의 인수로 exception_ptr 타입 객체에 대한 레퍼런스를 지정합니다. 일단 스레드가 생성되면 doWorkInThread() 함수는 join() 메소드를 이용하여 이 스레드가 종료될 때까지 기다리고, 그 후 에러 객체가 발생하는지 검사합니다. exception_ptr은 NullablePointer 타입이기 때문에 if문으로 간단하게 검사할 수 있습니다. 이 값이 널이 아니라면 현재 스레드에 그 익셉션을 다시 던집니다. 이 예제에서는 메인 스레드가 현재 스레드입니다. 이 익셉션을 메인 스레드에서 다시 던지기 때문에 한 스레드에서 다른 스레드로 전달됩니다.

void doWorkInThread()
{
    std::exception_ptr error;
    // Launch thread
    std::thread t{ threadFunc, std::ref(error) };
    // Wait for thread to finish
    t.join();
    if (error) {
        std::cout << "Main thread received exception, rethrowing it...\n";
        std::rethrow_exception(error);
    }
    else {
        std::cout << "Main thread did not receive any exception.\n";
    }
}

 

여기서 구현한 main() 함수는 간단합니다. doWorkInThread()를 호출하고, doWorkInThread()에서 생성한 스레드가 던진 익셉션을 받도록 try/catch 블록을 작성합니다.

int main()
{
    try {
        doWorkInThread();
    }
    catch (const std::exception& e) {
        std::cout << "Main function caught: '" << e.what() << "'" << std::endl;
    }
}

위 코드의 실행 결과는 다음과 같습니다.

여기서는 예제를 간결하게 구성하기 위해 main 스레드에서 join()을 호출해서 메인 스레드에 블록시키고 스레드가 모두 마칠 때까지 기다립니다. 물론 실전에서는 이렇게 메인 스레드를 블록시키면 안됩니다. 예를 들어 GUI 어플리케이션에서 메인 스레드를 블록시키면 UI가 반응하지 않게 됩니다. 이럴 때는 스레드끼리 메세지로 통신하는 기법을 사용하는 것이 좋습니다. 예를 들어 앞에서 본 threadFunc() 함수는 current_exception() 결과의 복사본을 인자로 하여 UI 스레드로 메세지를 보낼 수 있습니다. 하지만 앞서 이야기했듯이 이렇게 하더라도 생성된 스레드에 대해 join()이나 detach()를 호출해야 합니다.

 


3. Atomic Operations Library

아토믹 타입(atomic type)을 사용하면 동기화 기법을 적용하지 않고 읽기와 쓰기를 동시에 처리하는 아토믹 액세스(atomic access)가 가능합니다. 아토믹 연산을 사용하지 않고 변수의 값을 증가시키면 스레드에 안전하지 않습니다. 컴파일러는 먼저 메모리에서 이 값을 읽고, 레지스터로 불러와서 값을 증가시킨 다음, 그 결과를 메모리에 다시 저장합니다. 그런데 이 과정에서 같은 메모리 영역을 다른 스레드가 건드리면 데이터 경쟁이 발생합니다.

예를 들어 다음 코드는 스레드에 안전하지 않게 작성이 되어 데이터 경쟁이 발생하는 상황을 보여줍니다.

int counter{ 0 }; // global variable
...
++counter;        // Executed in multiple threads

이 변수에 std::atomic 타입을 적용하면 다음에 설명할 뮤텍스 객체와 같은 동기화 기법을 따로 사용하지 않고도 스레드에 안전하게 만들 수 있습니다. 위의 코드를 atomic 타입을 사용하도록 고치면 다음과 같습니다.

std::atomic<int> counter{ 0 }; // global variable
...
++counter;                     // Excuted in multiple threads

 

아토믹 타입을 사용하려면 <atomic> 헤더 파일을 인클루드해야 합니다. C++ 표준은 언어에서 제공하는 모든 기본 타입마다 이름이 지정된 정수형 아토믹 타입(named integral atomic type)을 정의하고 있습니다.

다음 표는 몇 가지 아토믹 타입을 보여줍니다.

아토믹 타입을 사용할 때는 동기화 메커니즘을 명시적으로 사용하지 않아도 됩니다. 하지만 특정 타입에 대해 아토믹 연산으로 처리할 때는 뮤텍스와 같은 동기화 메커니즘을 내부적으로 사용하기도 합니다. 예를 들어 연산을 아토믹으로 처리하는 인스트럭션을 타겟 하드웨어에서 제공하지 않을 수 있습니다. 이런 경우에는 아토믹 타입에 대해 is_lock_free() 메소드를 호출해서 lock-free 인지, 즉, 명시적으로 동기화 메커니즘을 사용하지 않고도 수행할 수 있는지 확인해야 합니다.

 

std::atomic 클래스 템플릿은 정수 타입뿐만 아니라 다른 모든 종류의 타입에 대해서도 적용할 수 있습니다. 예를 들어 atomic<double>이나 atomic<MyType>과 같이 쓸 수 있습니다. 단, MyType을 쉽게 복사할 수 있어야 합니다. 지정한 타입의 크기에 따라 내부적으로 동기화 메커니즘을 사용해야 할 수도 있습니다.

다음 예제 코드를 보면 Foo와 Bar는 쉽게 복사할 수 있습니다. 다시 말해 이 두 클래스에 대해 std::is_trivially_copyable_v가 true 입니다. 하지만 atomic<Foo>는 lock-free가 아니고, atomic<Bar>는 lock-free 입니다.

class Foo { private: int m_array[123]; };
class Bar { private: int m_int; };

int main()
{
    std::atomic<Foo> f;
    // Outputs: 1 0
    std::cout << std::is_trivially_copyable_v<Foo> << " " << f.is_lock_free() << std::endl;
    std::atomic<Bar> b;
    // Outputs: 1 1
    std::cout << std::is_trivially_copyable_v<Bar> << " " << b.is_lock_free() << std::endl;
}

일정한 데이터를 여러 스레드가 동시에 접근할 때 아토믹을 사용하면 메모리 순서, 컴파일러 최적화 등과 같은 문제를 방지할 수 있습니다. 기본적으로 아토믹이나 동기화 메커니즘을 사용하지 않고서 동일한 데이터를 여러 스레드가 동시에 읽고 쓰는 것은 위험합니다.

 

3.1 Atomic Operations

C++ 표준에서는 여러 가지 아토믹 연산을 정의하고 있습니다. 그중 몇 가지만 살펴보도록 하겠습니다.

 

아토믹 연산에 대한 첫 번째 예제는 다음과 같습니다.

bool std::atomic<T>::compare_exchange_strong(T& expected, T desired);

이 연산을 아토믹하게 수행하도록 구현하면 다음과 같습니다. 여기서는 pseudo 코드로 표현했습니다.

if (*this == expected) {
    *this = desired;
    return true;
}
else {
    expected = *this;
    return false;
}

얼핏보면 조금 이상하지만, 데이터 구조를 락을 사용하지 않고 동시에 접근하게 만드는 데 핵심적인 연산입니다. 이렇게 lock-free 동시성 데이터 구조(concurrent data structure)를 이용하면 이 데이터 구조에 대해 연산을 수행할 때 동기화 메커니즘을 따로 사용하지 않아도 됩니다. 하지만 이렇게 데이터 구조를 표현하는 기법은 고급 주제에 해당하며 여기서 다루지는 않겠습니다.

CUDA Instructions (2) - Instruction 최적화

 

CUDA Instructions (2) - Instruction 최적화

References Professional CUDA C Programming Contents Single-Precision vs. Double-Precision Standard vs. Intrinsic Functions Understanding Atomic Instructions CUDA Instructions (1) 지난 포스팅을 통해..

junstar92.tistory.com

혹시 atomic 연산에 대해 조금 더 이해하고 싶으시다면, 위 포스팅의 마지막 부분에 아토믹 연산에 대한 내용을 다루고 있으니 참조하시면 도움이 되실 것 같습니다.. !

 

두 번째 예제는 정수 아토믹 타입에 적용되는 atomic::fetch_add()를 사용하는 것입니다. fetch_add()는 주어진 아토믹 타입의 현재 값을 가져와서 지정한 값만큼 증가시킨 다음, 증가시키기 전의 원래 값을 리턴합니다. 예를 들면 다음과 같습니다.

int main()
{
    std::atomic<int> value{ 10 };
    std::cout << "Value = " << value << std::endl;
    int fetched{ value.fetch_add(4) };
    std::cout << "Fetched = " << fetched << std::endl;
    std::cout << "Value = " << value << std::endl;
}

fetched와 value 변수의 값을 건드리는 다른 스레드가 없다면 결과는 다음과 같습니다.

 

정수형 아토믹 타입은 fetch_add(), fetch_sub(), fetch_and(), fetch_or(), fetch_xor(), ++, --, +=, -=, &=, ^=, |=과 같은 아토믹 연산을 지원합니다. 아토믹 포인터 타입은 fetch_add(), fetch_sub(), ++, --, +=, -=을 지원합니다.

C++20 이전에는 부동소수점 타입에 std::atomic을 사용(ex, atomic<float>, atomic<double>)하여 atomic reading/writing을 제공했지만, atomic 산술 연산은 제공하지 않았습니다. C++20에서는 부동소수점 아토믹 타입에 fetch_add()와 fetch_sub()를 지원하도록 추가되었습니다.

 

대부분의 아토믹 연산은 원하는 메모리 순서를 지정하는 매개변수를 추가로 받습니다. 예를 들면 다음과 같습니다.

T atomic<T>::fetch_add(T value, memory_order = memory_order_seq_cst);

그러면 디폴트로 설정된 memory_order를 다른 값으로 변경할 수 있습니다. C++ 표준은 이를 위해서 memory_order_relaxed, memory_order_consume, memory_order_acquire, memory_order_release, memory_order_acq_rel, memory_order_seq_cst를 제공하며, 모두 std 네임스페이스 아래에 정의되어 있습니다. 하지만 디폴트값이 아닌 다른 값을 지정할 일은 별로 없습니다. 디폴트보다 나은 성능을 보여주는 메모리 순서가 있지만 자칫 잘못하면 데이터 경쟁이 발생하거나 디버깅하기 힘든 스레드 관련 문제가 발생할 수 있습니다. 

 

3.2 Atomic Smart Pointer (C++20)

C++20부터는 <memory> 헤더를 통해 atomic<std::shared_ptr<T>>을 지원합니다. 이는 이전 버전의 C++ 표준에서는 허용되지 않았는데, 그 이유는 shared_ptr이 복사될 수 없기 때문(not trivially copyable)입니다. shared_ptr의 reference count를 저장하는 control block은 항상 thread-safe하며, 객체를 가리키는 포인터가 정확히 단 한 번만 삭제되는 것을 보장합니다. 그러나 shared_ptr의 다른 것들은 thread-safe하지 않습니다. 만약 reset()과 같은 비상수(non-const) 메소드가 shared_ptr 인스턴스에서 호출된다면 여러 스레드에서 동시에 사용되는 동일한 shared_ptr은 데이터 경쟁을 일으킵니다. 

반면, 여러 스레드에서 동일한 atomic<shared_ptr<T>>를 사용할 때는, 비상수 shared_ptr 메소드를 호출이 thread-safe 합니다.

 

3.3 Atomic References (C++20)

C++20에서는 std::atomic_ref 도 지원합니다. 기본적으로 std::atomic과 동일하며, 인터페이스도 동일합니다만 이것은 레퍼런스에서 동작합니다. 반면에 atomic은 항상 제공되는 값의 복사본을 생성합니다. atomic_ref 인스턴스는 이를 참조하는 객체보다 더 짧은 lifetime을 가져야합니다. atomic_ref는 복사 가능하고, 같은 객체를 가리키는 atomic_ref 인스턴스를 많이 만들수 있습니다. 만약 특정 객체를 참조하는 atomic_ref 인스턴스가 있다면, 그 객체는 atomic_ref 인스턴스 중의 하나를 거치지 않고는 건들일 수 없습니다. atomic_ref<T> 클래스 템플릿은 복사 가능한 타입(trivially copyable type) T에서 사용될 수 있습니다. 또한, 표준 라이브러리는 아래의 내용들을 제공합니다.

  • fetch_add()와 fetch_sub()를 지원하는 포인터 타입에 대한 partial specialization
  • fetch_add(), fetch_sub(), fetch_and(), fetch_or() fetch_xor()을 지원하는 정수 타입에 대한 Full specialization
  • fetch_add(), fetch_sub()를 지원하는 부동소수점에 대한 Full specialization

atomic_ref를 사용하는 방법은 아래에서 살펴보겠습니다.

 

3.4 Using Atomic Types

이번에는 위에서 atomic 타입을 어떻게 사용할 수 있는지 간단히 알아보겠습니다. 여기서 우리는 increment()라는 함수를 사용할건데, 이 함수는 루프 내에서 정수 레퍼런스 파라미터를 1씩 증가시킵니다. 이 코드에는 std::this_thread::sleep_for()을 사용하는데, 이는 각 루프에서 약간의 딜레이를 주는 역할을 합니다. sleep_for()의 인수는 std::chrono::duration 타입입니다.

using namespace std::literals::chrono_literals;

void increment(int& counter)
{
    for (int i = 0; i < 100; i++) {
        ++counter;
        std::this_thread::sleep_for(1ms);
    }
}

 

이제 병렬로 여러 스레드를 실행하는데, 각 스레드는 공유하는 counter 변수에 대해 increment() 함수를 실행합니다. atomic 타입을 사용하지 않거나 어떠한 스레드 동기화 기법을 사용하지 않으면 데이터 경쟁이 발생합니다. 아래 코드는 10개의 스레드를 생성하고, 각 스레드는 join()을 호출함으로써 다른 스레드들이 끝날 때까지 대기합니다.

int main()
{
    int counter{ 0 };
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; i++) {
        threads.push_back(std::thread{ increment, std::ref(counter) });
    }

    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Result = " << counter << std::endl;
}

increment() 함수는 counter 파라미터를 100번 증가시키고, 스레드가 10개가 실행되었기 때문에 예상되는 결과는 1000입니다. 하지만 이 함수를 실행시켜보면 1000이 아닌 더 적은 수가 출력되며, 실행할 때마다 다른 값을 가집니다.

위 예제 코드는 데이터 경쟁을 보여주고 있습니다.

이번에는 atomic 타입을 사용하여 위 코드를 수정해보겠습니다.

void increment(std::atomic<int>& counter)
{
    for (int i = 0; i < 100; i++) {
        ++counter;
        std::this_thread::sleep_for(1ms);
    }
}

int main()
{
    std::atomic<int> counter{ 0 };
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; i++) {
        threads.push_back(std::thread{ increment, std::ref(counter) });
    }

    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Result = " << counter << std::endl;
}

<atomic> 헤더를 include하고 공유되는 counter 변수를 int가 아닌 atomic<int> 타입으로 변경합니다. 이렇게 수정한 버전의 코드를 실행하면 항상 1000이라는 결과를 얻을 수 있습니다.

이렇게 하면 명시적인 동기화 메커니즘을 사용하지 않고도, ++counter 연산은 아토믹으로 수행되기 때문에 어떠한 인터럽트도 받지 않습니다.

 

C++20에서 지원하는 atomic_ref를 사용해도 데이터 경쟁을 해결할 수 있습니다.

void increment(int& counter)
{
    std::atomic_ref<int> atomicCounter{ counter };
    for (int i = 0; i < 100; i++) {
        ++atomicCounter;
        std::this_thread::sleep_for(1ms);
    }
}

int main()
{
    int counter{ 0 };
    std::vector<std::thread> threads;
    for (int i = 0; i < 10; i++) {
        threads.push_back(std::thread{ increment, std::ref(counter) });
    }

    for (auto& t : threads) {
        t.join();
    }
    std::cout << "Result = " << counter << std::endl;
}

 

하지만, 이렇게 아토믹 연산을 사용하면 성능이라는 또 다른 문제가 발생합니다. 이렇게 counter에 대한 연산을 아토믹으로 처리하면 한 번에 한 스레드만 ++counter 연산을 수행하기 때문에 성능의 하락이 발생합니다. 위 예제에서 성능 문제를 해결할 수 있는 아주 간단한 방법은 increment() 함수가 로컬 변수를 사용하여 1씩 증가하는 결과값을 계산하고, 루프가 끝난 후에 이 결과를 counter 레퍼런스에 더하는 것입니다. 이전에는 1000번의 아토믹 연산이 수행되지만, 아래처럼 코드를 수정하면 10번의 아토믹만 수행됩니다.

void increment(int& counter)
{
    int result{ 0 };
    for (int i = 0; i < 100; i++) {
        ++result;
        std::this_thread::sleep_for(1ms);
    }
    counter += result;
}

 

3.5 Watinig on Atomic Variables (C++20)

C++20에는 아래 표의 메소드들이 std::atomic과 atomic_ref에 추가되었는데, 이를 사용하면 아토믹 변수가 수정될 때까지 효율적으로 기다릴 수 있습니다.

이 메소드는 다음과 같이 사용할 수 있습니다.

int main()
{
    std::atomic<int> value{ 0 };

    std::thread job{ [&value] {
        std::cout << "Thread starts waiting.\n";
        value.wait(0);
        std::cout << "Thread wakes up, value = " << value << std::endl;
    } };

    std::this_thread::sleep_for(2s);

    std::cout << "Main thread is going to change value to 1.\n";
    value = 1;
    value.notify_all();

    job.join();
}

실행 결과는 다음과 같습니다.

 


 

다음 포스팅에 이어서 멀티스레딩 프로그래밍에 대해 알아보겠습니다 !

[C++] 멀티스레딩 프로그래밍 (2)

 

댓글