본문 바로가기
프로그래밍/병렬프로그래밍

[pthread] Thread 동기화

by 별준 2021. 11. 18.

References

  • An Introduction to Parallel Programming

Contents

  • Semaphore (세마포어)
  • Producer-Consumer Synchronization

 

각 스레드는 다른 스레드로 '메세지 전송'을 수행하고 전송받은 메세지를 출력하는 프로그램을 작성한다고 가정해봅시다. 예를 들어, t개의 스레드가 있을 때, 스레드 0은 스레드 1로 메세지를 전송하고 스레드 1은 스레드 2로 메세지를 전송합니다. 즉, 스레드 t-2는 스레드 t-1에게 메세지 전송하고 스레드 t-1은 스레드 0에게 메세지를 전송합니다. 그리고 스레드는 메세지를 수신한 후에 수신받은 메세지를 출력하고 스레드를 종료합니다.

 

메세지 전송을 구현하기 위해서 char* 타입의 공유 배열을 할당하고, 각 스레드 함수에서 전송하려는 메세지를 위한 공간을 할당하고 메세지를 초기화한 후에 그 메세지의 주소를 공유 배열의 포인터로 설정하도록 합니다.

이러한 내용으로 Send_msg 함수를 작성하면 아래처럼 작성할 수 있습니다.

void* Send_message(void* rank)
{
    long my_rank = (long)rank;
    long dest = (my_rank + 1) % thread_count;
    long src = (my_rank + thread_count - 1) % thread_count;
    char* my_msg = (char*)malloc(MSG_MAX * sizeof(char));

    sprintf(my_msg, "Hello to %ld from %ld", dest, my_rank);
    messages[dest] = my_msg;

    if (messages[my_rank] != NULL)
        printf("Thread %ld > %s\n", my_rank, messages[my_rank]);
    else
        printf("Thread %ld > No message from %ld\n", my_rank, src);

    return NULL;
}

여기서 messages는 char** 타입이며 main문에서 할당됩니다.

전체 코드는 아래 링크에서 확인하실 수 있습니다.

https://github.com/junstar92/parallel_programming_study/blob/master/pthread/06_pth_message.c

 

GitHub - junstar92/parallel_programming_study

Contribute to junstar92/parallel_programming_study development by creating an account on GitHub.

github.com

 

두 개 이상의 스레드를 실행할 때 몇 개의 메세지는 수신되지 못하는 것을 확인할 수 있습니다. 이는 당연한 현상입니다. 스레드 0은 스레드 t-1이 메세지를 공유 메세지 배열에 복사하기 전에 완료된 것이죠. 이전 포스팅에서 busy waiting에 대해서 알아봤는데, 메세지를 출력하기 전에 while문(line 11)을 추가하면 문제를 해결할 순 있습니다.

void* Send_message(void* rank)
{
    long my_rank = (long)rank;
    long dest = (my_rank + 1) % thread_count;
    long src = (my_rank + thread_count - 1) % thread_count;
    char* my_msg = (char*)malloc(MSG_MAX * sizeof(char));

    sprintf(my_msg, "Hello to %ld from %ld", dest, my_rank);
    messages[dest] = my_msg;

	while (messages[my_rank == NULL);
    if (messages[my_rank] != NULL)
        printf("Thread %ld > %s\n", my_rank, messages[my_rank]);
    else
        printf("Thread %ld > No message from %ld\n", my_rank, src);

    return NULL;
}

busy waiting을 추가한 실행 결과

 

하지만, 이 솔루션은 busy waiting의 문제점을 그대로 가지고 있으므로, 다른 방법이 필요합니다.

...
messages[dest] = my_msg;
// Notify thread dest that it can proceed;

// Await notification from source thread;
printf("Thread %ld > %s\n", my_rank, messages[my_rank]);

위와 같이 저장한 메세지를 전달하는 dest 스레드에게 알려주고, source 스레드로부터 알림을 받는 작업이 필요합니다.

 

뮤텍스를 사용해도 확실하게 이 문제를 해결할 순 없습니다. 아래의 코드에서 두 개의 스레드가 실행되었다고 가정해봅시다.

만약 스레드 0이 스레드 1이 line 2에 도달하기 전에 line 7의  두 번째 pthread_mutex_lock을 호출한다면, 결국 스레드 0은 null 포인터인 메세지를 참조하게 되고, 프로그램은 종료됩니다.

 


Semaphore

POSIX에는 크리티컬 섹션의 액세스를 제어하는 다른 방법을 제공하는데, 바로 Semaphore(세마포어) 입니다.

세마포어는 unsigned int의 특별한 타입이라고 생각하면 됩니다. 따라서 세마포어의 값은 0, 1, 2, ...과 같습니다. 이 값은 크리티컬 섹션에 액세스할 수 있는 스레드의 개수가 되고 일반적으로는 0, 1의 값만 사용합니다. 즉, 세마포어의 값이 0일 때에는 뮤텍스를 lock한 것이고, 1인 경우에는 뮤텍스를 unlock한 것과 동일합니다. (0과 1만 사용하는 세마포어를 binary semaphore라고 합니다.)

 

세마포어를 사용하기 위해서는 <semaphore.h> 헤더를 include 해야합니다.

그리고 세마포어를 사용하기 위한 함수들은 다음과 같습니다.

int sem_init(
	sem_t*    semaphore_p /* out */,
    int       shared      /* in  */,
    unsigned  initial_val /* in  */);
int sem_destroy(sem_t* semaphore_p /* in/out */);
int sem_post(sem_t* semaphore_p /* in/out */);
int sem_wait(sem_t* semaphore_p /* in/out */);

 

sem_init과 sem_destroy 함수를 통해 초기화/소멸을 수행합니다.

sem_init의 두 번째 인수인 shared는 그 값이 0이면 세마포어가 프로세스 안에서 스레드끼리 공유되고, 그 외의 값이면 프로세스끼리 공유가 되도록 합니다. 따라서 예제에서는 0으로 설정하면 됩니다. 마지막 인수는 세마포어가 가지는 초기 값을 설정하는 것이며 처음에는 아무도 접근하지 못하도록 세마포어 값을 0으로 설정합니다.

 

그리고 sem_post와 sem_wait 함수 호출을 통해서 세마포어의 값을 증가시키거나 감소시킬 수 있습니다.

세마포어를 사용하는 Send_msg 함수를 살펴보도록 하겠습니다.

void* Send_message(void* rank)
{
    long my_rank = (long)rank;
    long dest = (my_rank + 1) % thread_count;
    char* my_msg = (char*)malloc(MSG_MAX * sizeof(char));

    sprintf(my_msg, "Hello to %ld from %ld", dest, my_rank);
    messages[dest] = my_msg;
    sem_post(&semaphores[dest]); // increase semaphores[dest] by 1 -> 'unlock' the semaphore of dest

    sem_wait(&semaphores[my_rank]); // decrease semaphores[my_rank] by 1 and return -> wait for our semaphore to be unlocked
    printf("Thread %ld > %s\n", my_rank, messages[my_rank]);

    return NULL;
}

처음에 모든 세마포어의 초기값이 0이므로 전부 lock 되어 있는 상태입니다. 각 스레드에서는 전송할 메세지를 초기화하면 그때서야 dest 스레드의 세마포어인 semaphores[dest]의 값을 1 증가시킵니다. 이때, 이 작업이 sem_post 함수 호출을 통해서 이루어집니다. 그리고 sem_wait 함수를 통해서 현재 스레드의 세마포어인 semaphores[my_rank]의 값을 1 감소시키면서 출력을 수행합니다. 하지만, source 스레드에서 아직 메세지를 전달받지 못했으면 semaphores[my_rank]의 값은 아직 0이기 때문에 아직 출력할 준비가 되지 않았습니다. 이때, sem_wait를 호출하면 스레드는 semaphores[my_rank]가 unlock될 때까지 대기 상태가 됩니다. source 스레드에서 메세지를 전달받았다면, semaphores[my_rank]의 값은 1일 것이고, sem_wait 함수 호출을 통해서 이 값을 1 감소시키면서 크리티컬 섹션에 진입할 수 있게 되어 정상적으로 메세지를 출력하게 됩니다.

 

main 함수에서 세마포어의 초기화/소멸 함수는 아래의 main 함수에서 확인하실 수 있습니다.

sem_t* semaphores;

int main(int argc, char* argv[])
{
    pthread_t* thread_handles;

    if (argc != 2) {
        fprintf(stderr, "Usage: %s <number of threads>\n", argv[0]);
        exit(0);
    }

    thread_count = strtol(argv[1], NULL, 10);
    if (thread_count <= 0 || thread_count > MAX_THREADS) {
        fprintf(stderr, "The number of threads should be > 0 and < %d\n", MAX_THREADS);
        exit(0);
    }

    thread_handles = (pthread_t*)malloc(thread_count * sizeof(pthread_t));
    messages = (char**)malloc(thread_count * sizeof(char*));
    semaphores = (sem_t*)malloc(thread_count * sizeof(sem_t));

    for (long thread = 0; thread < thread_count; thread++) {
        messages[thread] = NULL;
        sem_init(&semaphores[thread], 0, 0);
    }
    for (long thread = 0; thread < thread_count; thread++)
        pthread_create(&thread_handles[thread], NULL, Send_message, (void*)thread);
    for (long thread = 0; thread < thread_count; thread++)
        pthread_join(thread_handles[thread], NULL);
    
    for (long thread = 0; thread < thread_count; thread++) {
        free(messages[thread]);
        sem_destroy(&semaphores[thread]);
    }
    free(messages);
    free(semaphores);
    free(thread_handles);

    return 0;
}

전체 코드는 아래 링크를 참조해주세요!

https://github.com/junstar92/parallel_programming_study/blob/master/pthread/07_pth_message_sem.c

 

GitHub - junstar92/parallel_programming_study

Contribute to junstar92/parallel_programming_study development by creating an account on GitHub.

github.com

 

세마포어를 사용하여 메세지 전송 및 출력 프로그램을 실행해보면, 정상적으로 모든 스레드에서 메세지를 수신받아서 출력하는 것을 확인할 수 있습니다. 물론 실행할 때마다 출력 순서는 랜덤입니다.

 

사실 위에서 작성한 메세지 전송 프로그램은 크리티컬 섹션을 포함하지 않습니다. 단순히 메세지를 전송하는 스레드와 메세지를 수신하는 스레드의 동기화를 맞추기 위해서 세마포어를 사용했지, Send_msg 함수에는 한 번에 하나의 스레드만이 실행할 수 있는 코드의 블록이 없습니다.

오히려 my_rank 스레드는 source 스레드에서 메세지 전송이 끝날 때까지 출력을 진행하지 않습니다. 이러한 타입의 동기화는 한 스레드가 다른 스레드가 어떠한 액션을 수행할 때까지 작업을 수행하지 않습니다. 이러한 형태의 동기화를 Producer-Consumer Synchronization(생산자-소비자 동기화)라고 합니다.

보통 아래의 경우처럼 사용되는데, 이번 예제에는 처음부터 아무도 소유권을 가질 수 없도록 lock이 되어 있고 메세지 전송이 완료되면 unlock하는 방법으로 사용되었습니다.
sem_wait(&semaphore);
/* critical section */
...
/* end of critical section */
sem_post(&semaphore);​

 

다음 글에서 스레드 동기화에 대해서 조금 더 살펴보도록 하겠습니다 !

 

+)

생산자-소비자 동기화는 전에 C++을 공부하면서 포스팅한 적이 있는데, 참조하시면 내용을 이해하는데 큰 도움이 될 거라고 생각됩니다 !

2021.08.09 - [프로그래밍/C & C++] - [C++] 생산자(Producer) / 소비자(Consumer) 패턴

 

[C++] 생산자(Producer) / 소비자(Consumer) 패턴

References 씹어먹는 C++ (https://modoocode.com/270) Contents 생산자 / 소비자 패턴 Producer - Consumer 패턴 예시 생산자(Producer) - 소비자(Consumer) 패턴은 멀티 쓰레드 프로그램에서 자주 사용되는 패턴..

junstar92.tistory.com

 

댓글