본문 바로가기
프로그래밍/병렬프로그래밍

[OpenMP] Critical Section

by 별준 2021. 11. 25.

References

  • An Introduction to Parallel Programming

Contents

  • Producer / Consumer 구조 프로그램 (메세지 패싱)
  • critical directive
  • explicit barrier
  • atomic directive
  • omp lock

이번 포스팅에서는 parallel for이나 for 디렉티브로는 병렬화하기 어려운 문제에 대해서 살펴보겠습니다.

 

메세지를 보내고 받는 프로그램을 작성해보려고 하는데, 이 프로그램에서 각 스레드는 공유하는 메세지 큐를 가지고 있으며, 하나의 스레드가 다른 스레드에 메세지를 전송하려고 하면, 그 메세지는 목적지가 되는 스레드의 큐에 enqueue됩니다. 그리고 목적지 스레드에서는 메세지 큐를 dequeue하여 가장 먼저 들어온 메세지를 수신할 수 있습니다.

 

큐에 대해서는 다들 아시리라 생각하고, 자세한 설명은 넘어가도록 하겠습니다.

https://github.com/junstar92/parallel_programming_study/tree/master/OpenMP/queue

 

GitHub - junstar92/parallel_programming_study

Contribute to junstar92/parallel_programming_study development by creating an account on GitHub.

github.com

큐를 위와 같이 구현하였고, 큐의 인터페이스는 다음과 같습니다. USE_OMP_LOCK은 정의되지 않았다고 생각해주세요.

#ifndef _QUEUE_H_
#define _QUEUE_H_
#ifdef USE_OMP_LOCK
#include <omp.h>
#endif

typedef struct queue_node_s {
    int src;
    int msg;
    struct queue_node_s* next_p;
} QNode;

typedef struct queue_s {
#ifdef USE_OMP_LOCK
    omp_lock_t lock;
#endif
    int enqueued;
    int dequeued;
    QNode* front_p;
    QNode* tail_p;
} Queue;

Queue* Allocate_queue(void);
void Free_queue(Queue* q);
void Print_queue(Queue* q);
void Enqueue(Queue* q, int src, int msg);
int Dequeue(Queue* q, int* src, int* msg);
int Search(Queue* q, int msg, int* src);

#endif

 

그럼 이제 각 스레드가 무작위로 정수형 메세지를 생성하고 그 메세지를 무작위로 다른 스레드에 전송하는 간단한 메세지 패싱 프로그램을 구현해보도록 하겠습니다. 메세지를 생성한 후에 스레드는 그 메세지를 적절한 메세지 큐에 enqueue합니다. 메세지를 전송한 후에 스레드는 수신받은 메세지가 있는지 큐를 체크하고, 만약 수신받은 메세지가 있으면 큐에서 첫 번째 메세지를 dequeue하고 그 메세지를 출력합니다. 따라서, 각 스레드는 메세지 전송과 메세지 수신을 번갈아 가면서 수행하게 됩니다. 스레드가 메세지 전송을 완료하면 모든 스레드가 완료할 때까지 메세지를 수신하고, 그모든 스레드가 전송을 완료하고 수신까지 완료되면 스레드는 종료하고 프로그램이 종료되도록 구현할 것입니다.

 

각 스레드의 동작을 의사코드로 작성하면 다음과 같습니다.

 

메세지 전송

메세지를 enqueue하기 위해서 메세지 큐에 액세스하는 것은 critical section내에서 이루어져야 할 것입니다. 큐는 링크드 리스트(linked list)를 사용하여 구현되어 있고, 큐의 처음과 끝을 가리키는 포인터를 변수로 가지고 있습니다. 만약 새로운 메세지를 enqueue하면 큐의 끝을 가리키는 포인터는 새로 들어온 메세지 노드를 가리키게 될 것입니다. 하지만 두 개의 스레드가 동시에 enqueue 작업을 수행하면 하나의 스레드가 enqueue하고 있을 때, 다른 스레드는 메세지를 잃어버릴 수 있습니다. 따라서 두 개의 오퍼레이션이 충돌하게 되므로 메세지를 큐에 넣는 것은 크리티컬 섹션이 되어야 합니다. 따라서 ciritical directive를 사용하여 크리티컬 섹션을 설정해줍니다.

의사코드로 작성하면 다음과 같습니다.

위의 코드는 한 번에 하나의 스레드만이 메세지를 전송할 수 있도록 해줍니다.

 

메세지 수신

메세지를 수신할 때의 동기화 문제는 전송과는 조금 다릅니다. 큐를 소유하고 있는 스레드는 본인에게 전송된 메세지를 큐에서 dequeue합니다. dequeue는 한 번에 하나의 메세지에 대해서만 이루어지기 때문에 큐에 2개의 메세지가 있다면 Dequeue 호출이 Enqueue 호출과 충돌할 가능성은 없습니다. 따라서 적어도 두 개 이상의 메세지가 큐에 존재하는 경우 큐의 크기를 다른 변수를 통해 보관하면 동기화로 인한 문제를 피할 수 있습니다. 따라서 큐에 두 개 이상의 메세지가 존재하는 경우에는 크리티컬 섹션을 설정할 필요가 없습니다.

 

그렇다면 큐의 크기를 저장하는 변수는 어떻게 설정하면 될까요? 아래처럼 enqueued와 dequeued라는 두 개의 변수를 사용하여 큐의 크기를 저장할 수 있는데, 이 변수를 사용하면 큐에 있는 메세지의 수는 

위와 같이 계산할 수 있습니다. enqueued는 enqueue된 메세지의 수이고, dequeued는 dequeued된 메세지의 수입니다.

여기서 dequeued를 업데이트하는 스레드만이 큐를 소유해야합니다. 만약 다른 스레드가 queue_size를 계산하는 것과 동시에 또 다른 스레드가 enqueued를 업데이트한다고 가정해봅시다. 그렇다면 queue_size를 계산하는 스레드는 enqueued의 이전값 또는 새로운값을 얻어오게 됩니다. 따라서 queue_size가 1이나 2가 되어야 하는 순간에 0이나 1로 얻을 수 있다는 것입니다. 하지만 우리 프로그램에서 큰 문제는 발생하지 않고, 약간의 딜레이만 발생될 뿐입니다. 만약 queue_size가 1이 되어야하는 순간 0을 얻었다고 하더라도, 다음 반복에서 다시 체크할 것입니다. 또한 queue_size가 2지만 1인 순간에도 크리티컬 섹션에만 접근하는 것이지 동작상 문제는 없습니다. 따라서 queue_size가 1일 때, dequeue를 하는 순간에만 크리티컬 섹션을 설정하고, 다른 부분에서는 크리티컬 섹션을 설정하지 않아도 문제가 없습니다.

의사코드로 나타내면 다음과 같습니다.

 

프로그램 종료 확인

스레드가 종료되는 조건에 대해서도 생각해봐야합니다. 만약 단순히 스레드의 큐에 있는 메세지 개수만 체크해서 종료 여부를 결정하면 어떻게 될까요?

위 코드는 다음의 문제점을 갖고 있습니다. 스레드 a가 queue_size를 0으로 계산을 완료한 후에, 다른 스레드 b가 스레드 a에게 메세지를 전송한다면, 스레드 a는 스레드 b가 보낸 메세지를 영원히 수신할 수 없게 됩니다.

 

우리가 작성할 프로그램은 메세지의 전송과 수신을 for 루프를 통해서 반복 수행할 예정인데, 이말은 for 루프가 끝난 후에는 어떠한 메세지도 전송을 하지 않는다는 것을 의미합니다. 따라서 우리는 done_sending이라는 카운터 변수를 추가하여 각 스레드가 for 루프를 완료할 때마다 이 변수의 값을 1씩 증가시켜 스레드의 종료를 확인할 때 이 변수를 사용하도록 할 것입니다.


메세지 패싱 프로그램 작성

그렇다면 위의 조건들을 만족하는 프로그램을 작성해보겠습니다. 전체 코드는 아래 링크에서 참조하실 수 있습니다.

https://github.com/junstar92/parallel_programming_study/blob/master/OpenMP/11_omp_msg.c

 

GitHub - junstar92/parallel_programming_study

Contribute to junstar92/parallel_programming_study development by creating an account on GitHub.

github.com

 

먼저 프로그램에서 사용되는 인터페이스 함수들과 메인 함수입니다.

#include <stdio.h>
#include <stdlib.h>
#include <omp.h>
#include "queue/queue.h"

const int MAX_MSG = 10000;

void Usage(char* prog_name);
void Send_msg(Queue* msg_queues[], int my_rank, int thread_count, int msg_number);
void Try_receive(Queue* q, int my_rank);
int Done(Queue* q, int done_sending, int thread_count);

int main(int argc, char* argv[])
{
    if (argc != 3)
        Usage(argv[0]);
    int thread_count = strtol(argv[1], NULL, 10);
    int send_max = strtol(argv[2], NULL, 10);
    if (thread_count <= 0 || send_max < 0)
        Usage(argv[0]);

    Queue** msg_queues = (Queue**)malloc(thread_count * sizeof(Queue*));
    int done_sending = 0;
#pragma omp parallel num_threads(thread_count) \
    default(none) shared(thread_count, send_max, msg_queues, done_sending)
    {
        int my_rank = omp_get_thread_num();
        srand(my_rank);
        msg_queues[my_rank] = Allocate_queue();

#pragma omp barrier /*  Don't let any threads send messages 
                        until all queue are contructed      */
        for (int msg_number = 0; msg_number < send_max; msg_number++) {
            Send_msg(msg_queues, my_rank, thread_count, msg_number);
            Try_receive(msg_queues[my_rank], my_rank);
        }
#pragma omp atomic
        done_sending++;
#ifdef DEBUG
        printf("Thread %d > done sending\n", my_rank);
#endif

        while (!Done(msg_queues[my_rank], done_sending, thread_count))
            Try_receive(msg_queues[my_rank], my_rank);
        
        /*  My queue is empty, and everyone is done sending
            So my queue won't be accessed again, and it's OK to free it */
        Free_queue(msg_queues[my_rank]);
        free(msg_queues[my_rank]);
    } /* omp parallel */

    free(msg_queues);
    return 0;
}

프로그램이 시작하면 마스터 스레드에서는 커맨드 라인의 인수들을 읽어서 스레드의 개수(thread_count)와 전송할 메세지의 개수(send_max)를 설정합니다. 그런 다음 각 스레드에서 사용할 메세지 큐 배열(msg_queues)을 할당하고, 이 배열은 스레드 간에 공유되어야 합니다(스레드 간 메세지 전송을 위해서). 이중 배열이라는 것에 주의해야 합니다.

여기서 메세지 큐는 구조체이며 다음의 항목들을 저장하고 있습니다.

  • 메세지 리스트
  • 큐의 front, rear에 위치한 데이터의 주소
  • enqueue된 메세지의 카운트
  • dequeue된 메세지의 카운트

메세지 큐 배열이 준비되면 parallel 디렉티브를 사용하여 각 스레드를 시작합니다. (line 24 ~)

여기서 중요한 점은 각 스레드에서 메세지 큐가 실제로 할당된다는 점입니다. 따라서, 모든 스레드에서 메세지 큐가 할당되기 전에 어느 한 스레드가 메세지 전송을 시작해버리면 프로그램이 종료되는 문제가 발생할 수 있습니다. 따라서 모든 스레드에서 메세지 큐가 할당될 때까지 메세지를 전송하지 않도록 해야합니다. parallel 디렉티브 뒤에 오는 블록 끝에서는 묵시적 배리어가 적용되지만, 현재 상황에서는 병렬화된 블록 내부에서 배리어가 필요합니다. 이를 위해서 OpenMP에서는 명시적 배리어(explicit barrier)를 제공합니다.

#pragma omp barrier

이 부분이 line 31에 적용되어 있습니다. 각 스레드에서 이 부분을 만나게 되면 같은 팀에 있는 모든 스레드가 이 배리어에 도달할 때까지 블록되어서 대기하게 됩니다. 그리고 모든 스레드가 이 배리어에 도달하게 되면 팀 안의 스레드들은 블록을 해제하고 다시 작업을 수행합니다.

 

atomic directive

따라서, 모든 스레드에서 메세지 큐 할당이 완료되면 for 루프를 통해서 무작위로 메세지를 전송하고 수신하는 작업을 반복하게 됩니다. 정해진 횟수(send_max)만큼 전송 및 수신을 완료하면 각 스레드는 done_sending 변수의 값을 1 증가시킵니다. 이때, 당연히 done_sending 변수의 값을 증가시키는 것은 크리티컬 섹션에서 수행되어야 합니다. OpenMP에서 critical 디렉티브보다 편리하고 강력한 크리티컬 섹션 기능을 제공하는데, 바로 atomic 디렉티브입니다.

#pragma omp atomic

atomic 디렉티브를 사용하면 critical 디렉티브와는 달리 하나의 C 문장을 크리티컬 섹션으로 보호할 수 있습니다. 그리고 보호할 문장에는 제약이 있는데, 아래의 형태의 문장만 보호할 수 있습니다. (정확히는 아래 문장의 x 변수만 보호)

여기서 <op>는 다음의 이진 연산자 중의 하나가 되어야 합니다.

또한, <expression>은 레퍼런스는 사용할 수 없습니다.

atomic 디렉티브는 아래처럼 사용할 수 있는데,

#pragma omp atomic
x += y++;

이때, x의 load와 store는 크리티컬 섹션으로 보호받습니다. 따라서, 한 스레드에서 x에 대한 업데이트를 진행하고 있을 때, 다른 스레드에서는 이 크리티컬 섹션에 진입할 수 없습니다. 하지만 y 변수에 대해서는 보호되지 않으며 따라서 예상하지 못한 결과가 나타날 수 있습니다.

 

이어서 프로그램을 살펴보면, done_sending 값을 증가시키고 나서 while 반복문을 통해서 메세지 전송 및 수신의 종료 여부를 계속 체크합니다. (line 43)

메세지 전송이 완료되더라도 다른 스레드에서 해당 스레드에 메세지를 전송할 수 있기 때문에 다른 스레드들도 전송이 완료될 때까지 계속해서 메세지 수신을 확인(line 44)하고, 모든 스레드가 전송이 완료된 후에야 각 스레드는 수신된 메세지를 모두 출력하고 종료됩니다. 종료되기 전에 메세지 큐를 해제해줍니다(line 48-49).

 

2개의 스레드로 각각 5개의 메세지를 전달하도록 실행하면 다음과 같은 출력 결과를 확인할 수 있습니다.


크리티컬 섹션과 잠금(lock)

메세지 패싱 프로그램에서 사용된 ciritical directive에 대해서 조금 더 살펴보겠습니다. 앞서 살펴본 프로그램에는 여러 개의 크리티컬 섹션을 가지고 있고, critical 또는 atomic 디렉티브를 사용하여 다음의 3개의 코드 블록을 크리티컬 섹션으로 설정했습니다. 따라서 이 코드 블록은 무조건 하나의 스레드만 접근할 수 있습니다.

  • done_sending++;
  • Enqueue(msg_queues[dest], my_rank, msg);
  • Dequeue(q, &src, &msg);

그러나 이 3개의 코드 블록이 서로 배타적으로 액세스하도록 할 필요는 없습니다. Enqueue와 Dequeue 코드 블록도 배타적일 필요가 없습니다. 예를 들어, 스레드 0이 스레드 1의 큐에 메세지를 enqueue할 때, 동시에 스레드 1이 스레드 2의 큐에 메세지를 enqueue해도 상관없다는 것입니다.

정확히 말하자면, OpenMP 관점에서 메세지 패싱 프로그램은 두 개의 크리티컬 섹션을 가지고 있습니다. 하나는 atomic 디렉티브로 보호되고 있는 done_sending++ 이며, 다른 하나는 메세지는 enqueue하고 dequeue하는 것입니다. 따라서 이 두 개의 크리티컬 섹션은 배타적이지 않고 동시에 진입할 수 있는 각각의 크리티컬 섹션입니다.

하지만 Enqueue와 Dequeue의 크리티컬 섹션은 서로 배타적이며 이 부분은 스레드 간에서 시리얼로 실행됩니다. 따라서 프로그램의 성능을 떨어뜨릴 수 있습니다.

#pragma omp critical(name)

OpenMP는 ciritical 디렉티브에 name을 추가할 수 있는 옵션을 제공하는데, 서로 다른 name을 사용하는 critical 디렉티브로 보호되는 두 개의 블록은 동시에 실행이 가능합니다. 이 name은 런타임에 결정됩니다.

하지만 메세지 패싱 프로그램에서 다른 큐를 액세스하는 스레드는 같은 코드 블록을 사용(Enqueue, Dequeue)하므로 name을 설정한 critical 디렉티브만으로는 충분하지 않습니다.

 

이 문제의 대안이 바로 잠금(lock)을 사용하는 것입니다. 이는 크리티컬 섹션에서 개발자가 상호 배제를 명시적으로 할 수 있도록 omp의 자료구조와 함수로 구성되어 있습니다. 이 잠금은 pthreads의 뮤텍스나 세마포어와 거의 동일하다고 볼 수 있습니다. 

typedef struct queue_s {
#ifdef USE_OMP_LOCK
    omp_lock_t lock;
#endif
    int enqueued;
    int dequeued;
    QNode* front_p;
    QNode* tail_p;
} Queue;

OpenMP에서는 두 가지 종류의 lock을 제공하는데, 그 중 하나인 simple locks만 살펴보겠습니다. 이 lock은 omp_lock_t 타입의 자료구조를 선언하여 사용할 수 있으며, 함께 사용되는 함수는 다음과 같습니다.

void omp_init_lock(omp_lock_t* lock);
void omp_set_lock(omp_lock_t* lock);
void omp_unset_lock(omp_lock_t* lock);
void omp_destroy_lock(omp_lock_t* lock);

이 omp_lock_t와 함수들을 사용하여 스레드가 크리티컬 섹션에 진입하기 전에 omp_set_lock 함수를 호출하여 lock을 set합니다. 만약 크리티컬 섹션 내 코드를 실행하는 스레드가 없다면 lock을 얻어서 크리티컬 섹션을 진행하고 다른 스레드가 실행하고 있다면 대기합니다. 그리고 스레드가 크리티컬 섹션의 코드를 모두 수행하고 나면 omp_unset_lock 함수를 호출하여 lock을 unset하고 다른 스레드가 이 lock을 얻을 수 있도록 해줍니다.

 

사용법은 pthreads와 유사합니다. 아마 코드만 살펴보면 충분히 이해하실 것이라고 생각합니다.

omp_lock_t와 함수들을 사용하는 버전은 컴파일할 때 -DUSE_OMP_LOCK 옵션을 추가해주면 됩니다. 옵션을 추가하면 critical 디렉티브 대신 OpenMP lock을 사용합니다.

 

void Send_msg(Queue* msg_queues[], int my_rank, int thread_count, int msg_number)
{
    int msg = rand() % MAX_MSG;
    int dest = rand() % thread_count;
#ifndef USE_OMP_LOCK
#pragma omp critical
    Enqueue(msg_queues[dest], my_rank, msg);
#else
    Queue* msg_queue = msg_queues[dest];
    omp_set_lock(&msg_queue->lock);
    Enqueue(msg_queue, my_rank, msg);
    omp_unset_lock(&msg_queue->lock);
#endif
#ifdef DEBUG
    printf("Thread %d > sent %d to %d\n", my_rank, msg, dest);
#endif
}

void Try_receive(Queue* q, int my_rank)
{
    int src, msg;
    int q_size = q->enqueued - q->dequeued;

    if (q_size == 0)
        return;
    else if (q_size == 1) {
#ifndef USE_OMP_LOCK
#pragma omp critical
        Dequeue(q, &src, &msg);
#else
        omp_set_lock(&q->lock);
        Dequeue(q, &src, &msg);
        omp_unset_lock(&q->lock);
#endif
    }
    else
        Dequeue(q, &src, &msg);
#ifdef DEBUG
    printf("Thread %d > received %d from %d\n", my_rank, msg, src);
#endif
}

메세지 패싱 프로그램에서 Send_msg 함수와 Try_receive 함수 내에서 omp_lock_t는 위와 같이 사용할 수 있습니다.

 

다만, 두 버전으로 성능을 비교해봐도 드라마틱한 차이는 없었고, 거의 비슷한 수행 시간을 보여주고 있습니다.


critical / atomic / lock ?

크리티컬 섹션에서 상호 배제를 할 수 있는 3가지의 메커니즘에 대해서 알아봤습니다. 그렇다면 어떤 경우에 어떤 방법이 더 좋을까요? 

일반적으로 atomic 디렉티브가 상호 배제를 확보하는 가장 빠른 방법입니다. 따라서 크리티컬 섹션이 atomic 디렉티브를 사용할 수 있는 조건을 만족한다면 다른 방법보다 atomic 디렉티브를 사용하는 것이 적어도 성능면에서는 좋습니다. 다만, OpenMP 표준을 살펴보면 atomic 디렉티브는 프로그램 내에 있는 모든 atomic 디렉티브에 대해서 상호 배제를 수행합니다. 이는 name을 사용하지 않은 critical 디렉티브의 형태와 같습니다. 예를 들어, 아래의 두 블록이 atomic 디렉티브로 크리티컬 섹션으로 설정되어 있다면, x와 y가 서로 관련없는 메모리 위치에 있다고 하더라도 하나의 스레드가 x++를 실행한다면 어떠한 스레드도 y++을 동시에 실행할 수 없습니다.

따라서, atomic 디렉티브로 보호받는 여러개의 크리티컬 섹션이 있다면 name을 사용한 critical 디렉티브나 lock을 사용해야 합니다. 

댓글