본문 바로가기
프로그래밍/병렬프로그래밍

[pthread] 배리어(Barrier)와 조건 변수(Condition Variable)

by 별준 2021. 11. 19.

References

  • An Introduction to Parallel Programming

Contents

  • 배리어(Barrier)
  • 조건 변수(Condition Variable)

2021.11.18 - [프로그래밍/병렬프로그래밍] - [pthread] Thread 동기화

이전 포스트에서 메세지 전송 프로그램을 작성하며 스레드 동기화에 대해서 알아봤습니다.

프로그램에서 스레드들이 동일한 포인트에 있는지 확인하여 동기화할 수 있었습니다. 어떠한 스레드도 모든 스레드가 이 포인트에 도달할 때까지 진행될 수 없으므로 동기화의 이런 포인트 지점을 배리어(Barrier)라고 합니다.

 

배리어는 여러 어플리케이션에서 다양하게 사용됩니다. 이전 MPI 프로그래밍에서 성능 측정을 위해 각 스레드의 수행 시간을 측정할 때, 모든 스레드가 같은 시점에 알고리즘을 실행할 수 있도록 타이밍을 맞추려고 MPI_Barrier 함수를 호출한 적이 있습니다.

(참고 :

2021.11.12 - [프로그래밍/병렬프로그래밍] - [MPI] 행렬 - 벡터 곱 연산 + 성능 평가)

 

이처럼 스레드가 수행한 시간을 측정할 때, 코드의 시작 타이밍을 맞추기 위해서 사용되었습니다.

이를 위해 다음과 같이 코드를 작성했었습니다.

이 방법을 사용하면 모든 스레드가 같은 시간에 my_start를 기록하게 됩니다.

 

배리어를 사용하는 다른 중요한 이유는 바로 디버깅때문입니다. 여러 병렬 프로그램에서 봤던 것처럼 병렬 프로그램에서는 에러가 언제 발생할 지 알기가 어렵습니다. 물론 프로그램에서 각 스레드가 어떤 지점에 도착했는지 메세지를 출력할 수는 있지만, 이런 방법은 출력하는 메세지가 너무 많아서 확인하기는 힘듦니다. 이때, 배리어를 사용하면 이런 문제를 해결할 수 있습니다.

 

이번 포스팅에서 배리어를 직접 구현해보겠습니다. 구현할 수 있는 여러 가지 방법이 있는데, 3가지 방법을 알아볼 것입니다.

첫 번째 방법은 이전 글들에서 이미 알아봤던 busy waiting과 mutex를 사용하는 방법입니다.

 

총 100번의 루프를 도는데, 한 번의 루프에서 각 스레드가 모두 도착해야 다음 루프로 넘어가는 프로그램을 작성하고, 루프를 전부 수행하는데 걸리는 시간을 측정하도록 하겠습니다.


busy waiting과 mutex를 사용한 배리어

busy waiting과 mutex를 사용하면 직관적으로 구현할 수 있습니다. 뮤텍스에 의해서 보호되는 공유 카운터를 사용하는데, 이 공유 카운터는 각 스레드가 포인트에 도착했을 때 1을 증가시킵니다. 만약 공유 카운터의 값이 스레드의 수와 동일하다면 모든 스레드가 어느 포인트에 모두 도달했다는 뜻이고, 다음 루프로 넘어갈 수 있습니다.

 

코드는 다음과 같이 작성할 수 있습니다. 

#define BARRIER_COUNT 100
int barrier_thread_counts[BARRIER_COUNT];

void* Thread_work(void* rank)
{
    for (int i = 0; i < BARRIER_COUNT; i++) {
        pthread_mutex_lock(&barrier_mutex);
        barrier_thread_counts[i]++;
        pthread_mutex_unlock(&barrier_mutex);
        while (barrier_thread_counts[i] < thread_count);
    }

    return NULL;
}

barrier_thread_counts 배열은 각 루프에서 도착한 스레드의 수입니다. 이 배열은 메인 스레드에서 모두 0으로 초기화된 후에 사용됩니다. 스레드 함수가 수행되고 나서 루프에 진입하면, 우선 뮤텍스 lock을 호출하고, 공유 카운터를 증가시킵니다. 그 후에 다시 뮤텍스를 unlock하고, 모든 스레드가 이 포인트에 도달할 때까지 line 10의 busy waiting에 진입합니다. 모든 스레드가 이 포인트에 도달하게 되면, barrier_thread_counts[i]의 값이 thread_count가 되기 때문에 각 스레드는 while 루프를 벗어나며 다음 루프에 진입합니다. 이 과정을 100번 반복합니다.

 

전체 코드는 아래 링크에서 확인하실 수 있습니다.

https://github.com/junstar92/parallel_programming_study/blob/master/pthread/08_pth_busy_barrier.c 

 

GitHub - junstar92/parallel_programming_study

Contribute to junstar92/parallel_programming_study development by creating an account on GitHub.

github.com

 

-DDEBUG 옵션을 추가하여 컴파일하고, 실행하면

위의 출력을 확인하실 수 있습니다. 약 0.001 sec가 소요되는데, 이는 출력으로 인해서 약간 성능이 안나온 것이고, -DDEBUG 옵션을 끄고 컴파일한 후에 똑같이 실행하면,

약 0.0003 sec의 시간이 걸리게 됩니다.

 

이 구현은 이전 포스트에서 살펴봤던 busy waiting의 문제를 그대로 가지고 있습니다. 스레드가 while 루프를 반복해서 체크하는 동안 CPU 사이클을 소모하게 되며, 따라서 프로그램의 성능이 감소하게 됩니다. 스레드를 4개만 사용했을 때는 큰 로드가 걸리지 않기 때문에 빠르게 끝이 났지만, 코어보다 더 많은 스레드로 실행하면 수행 시간은 급격하게 증가합니다.

thread_count = 64 일 때, 수행 시간: 약 5.978 sec

 

문제는 아니지만 또 살펴볼 것은 공유 변수를 배열로 정의하여 각 루프에서는 각각의 카운터를 따로 사용하도록 했다는 것입니다. 만약 배열이 아닌 하나의 카운터로 위 코드를 수행한다면, 마지막 스레드가 포인트에 도달한 후에 counter를 0으로 다시 초기화해주어야 합니다. 하지만 마지막 스레드가 counter를 0으로 초기화한다면, busy waiting 중에 있는 스레드들은 counter == thread_count 조건을 만족할 수 없으므로, busy waiting을 빠져나올 수가 없게 됩니다.

 


Semaphore를 사용한 배리어

세마포어를 사용해서 배리어를 구현할 수도 있습니다. 세마포어를 사용한다면, busy waiting에서 마주했던 문제를 줄일 수 있을까요?

 

세마포어로 Thread_work를 구현하려면, 2개의 세마포어를 사용해야 합니다. count_sem과 barrier_sems(배열)을 사용할 것인데, count_sem은 카운터를 보호하는데 사용되고, barrier_sems는 배리어로 진입하는 스레드를 block시키는 데 사용됩니다. 

우선 루프를 수행하는 스레드 함수는 다음과 같습니다.

long counter;
sem_t barrier_sems[BARRIER_COUNT];
sem_t count_sem;
void* Thread_work(void* rank)
{
    for (int i = 0; i < BARRIER_COUNT; i++) {
        sem_wait(&count_sem);
        if (counter == thread_count - 1) {
            counter = 0;
            sem_post(&count_sem);
            for (int j = 0; j < thread_count - 1; j++)
                sem_post(&barrier_sems[i]);
        }
        else {
            counter++;
            sem_post(&count_sem);
            sem_wait(&barrier_sems[i]);
        }
    }

    return NULL;
}

count_sem은 메인 스레드에서 1로 초기화되어, unlock된 상태에서 스레드가 시작됩니다. barrier_sems는 모두 0으로 초기화되어 처음에는 아무도 사용할 수 없는 상태인 lock된 상태로 스레드가 시작됩니다. 첫 번째 스레드가 line 7의 sem_wait(&count_sem)를 호출하여 크리티컬 섹션에 진입하고, 카운터를 1 증가(line 15)시킨 후에 sem_post(&count_sem)을 호출(line 16)하여 다시 count_sem을 unlock하고 line17의 sem_wait(&barrier_sems[i])를 호출하여 barrier_sems[i]를 lock하려고 시도합니다. 하지만 barrier_sems는 이미 lock된 상태로 초기화되었기 때문에, barrier_sems가 unlock되어 액세스가 허용될 때까지 block 됩니다.

마지막 스레드가 남을 때까지 위 과정을 반복하고, 각 스레드들은 block 상태에 빠지게 됩니다. 마지막 스레드에서는 counter == thread_count - 1을 만족하므로 line 9로 진입하게 되고, 카운트를 초기화해줍니다. 그리고 line 11-12에서 sem_post(&barrier_sems[i])를 thread_count - 1만큼 호출하면서 barrier_sems[i]의 값을 1씩 증가시켜줍니다. 이 값이 증가되면 다른 스레드에서는 이제 배리어에 진입하여 스레드 함수를 수행하게 됩니다. 즉, line 17의 sem_wait(&barrier_sems[i])에서 세마포어의 값이 0보다 크기 때문에 1씩 감소시키면서 리턴되어 스레드 함수를 진행하게 됩니다.

 

세마포어로 구현한 배리어는 스레드가 sem_wait에서 block되어 있는 동안 CPU 사이클을 소비하지 않기 때문에 busy waiting를 사용한 구현보다는 낫다고 할 수 있습니다. 하지만, 카운터는 재사용할 수 있지만 barrier_sems은 재사용이 불가능하기 때문에 각 스레드에서 사용할 barrier_sem을 위해서 배열로 정의하여 사용합니다.

 

컴파일 후 실행하면

위의 결과를 얻을 수 있었습니다. 64개의 스레드의 수행 시간은 약 0.093초로 busy waiting보다 훨씬 좋은 성능을 보여주고 있습니다.

 

전체 코드는 아래 링크를 참조하시기 바랍니다 !

https://github.com/junstar92/parallel_programming_study/blob/master/pthread/09_pth_sem_barrier.c

 

GitHub - junstar92/parallel_programming_study

Contribute to junstar92/parallel_programming_study development by creating an account on GitHub.

github.com

 


조건 변수(Condition Variable)을 사용한 배리어

Pthreads에서 배리어를 생성하는 더 좋은 방법은 바로 조건 변수(Condition Variable)을 사용하는 것입니다. 조건 변수는 스레드가 특정 이벤트나 조건이 발생할 때까지 실행을 정지(suspend)하는 데이터 객체입니다. 이벤트나 조건이 발생하면, 다른 스레드는 정지되어 있는 스레드에게 'wake up' 시그널을 보내줍니다. 조건 변수는 항상 뮤텍스와 같이 사용되므로, 조건 변수를 사용할 때에는 뮤텍스도 같이 사용됩니다.

 

의사코드로 표현하면 조건 변수는 다음과 같이 사용될 수 있습니다.

여기서 else 조건에서 조건에 만족하지 않아 스레드가 block될 때에는 mutex의 lock을 풀고, 정지 상태에 빠지게 됩니다. 그리고 다른 스레드에 의해서 'wake up' 시그널을 받아서, 깨어날 때에는 다시 mutex를 lock하고 스레드가 다시 수행됩니다.

 

Pthreads에서 조건 변수는 pthread_cond_t 타입으로 정의할 수 있습니다. 그리고, 뮤텍스처럼 초기화되고 삭제되어야 합니다. 이는 pthread_cond_initpthread_cond_destroy 함수 호출로 할 수 있습니다.

int pthread_cond_init(pthread_cond_t* cond_p,
                      const pthread_condattr_t* cond_attr_p);
int pthread_cond_destroy(pthread_cond_t* cond_p);

그리고 조건 변수와 사용되는 함수는 pthread_cond_signal, pthread_cond_broadcast, pthread_cond_wait 가 있습니다.

int pthread_cond_signal(pthread_cond_t* cond_var_p);
int pthread_cond_broadcast(pthread_cond_t* cond_var_p);
int pthread_cond_wait(pthread_cond_t* cond_var_p,
                      pthread_mutex_t* mutex_p);

pthread_cond_signal 함수는 block된 스레드 중 하나에 'wake up' 시그널을 보내 그 스레드를 unblock 합니다.

pthread_cond_broadcast 함수는 block된 스레드 전부를 unblock 합니다.

pthread_cond_wait 함수는 mutex_p에 의해서 참조되는 뮤텍스를 unlock하고 pthread_cond_signal이나 pthread_cond_broadcast의 호출로 인해 unblock될 때까지 해당 스레드를 block 시킵니다. pthread_cond_signal이나 pthread_cond_broadcast로 인해 unblock될 때, 다시 뮤텍스를 lock해주고 스레드를 계속 진행합니다.

그렇기 때문에, pthread_cond_wait를 의사코드로 표현하면 다음과 같다고 볼 수 있습니다.

 

조건 변수를 사용하여 배리어를 구현한 스레드 함수는 다음과 같이 작성할 수 있습니다.

void* Thread_work(void* rank)
{
    for (int i = 0; i < BARRIER_COUNT; i++) {
        pthread_mutex_lock(&barrier_mutex);
        barrier_thread_count++;

        if (barrier_thread_count == thread_count) {
            barrier_thread_count = 0;
            pthread_cond_broadcast(&ok_to_proceed);
        }
        else {
            // Wait unlocks mutex and puts thread to sleep.
            //    Put wait in while loop in case some other
            // event awakens thread.
            while (pthread_cond_wait(&ok_to_proceed, &barrier_mutex) != 0);
            // Mutex is relocked at this point.
        }
        pthread_mutex_unlock(&barrier_mutex);
    }

    return NULL;
}

여기서 pthread_cond_wait가 while 루프 위에서 호출되는데, 이는 pthread_cond_broadcast나 pthread_cond_signal의 호출이 아닌 다른 이벤트에 의해서 unblock될 수 있기 때문입니다. 만약 다른 이벤트로 인하여 unblock되면 리턴값은 0이 아닌 값이 되는데, 다른 이벤트에 의해서 unblock되면 우리가 의도한 동작이 아니기 때문에 다시 block 해주기 위하여 while 루프 위에서 호출하도록 작성하였습니다. 

마지막 스레드가 mutex를 얻기 전까지, 나머지 스레드들은 mutex를 얻고 barrier_thread_count를 1씩 증가시키면서 pthread_cond_wait를 호출하며 block상태가 됩니다. 그리고 마지막 스레드가 mutex를 얻고 barrier_thread_count를 1 증가시키면, barrier_thread_count == thread_count를 만족하게 되고, barrier_thread_count를 초기화한 후에 pthread_cond_broadcast를 호출하여 다른 스레드들이 모두 unblock되도록 합니다. 

 

이렇게 구현한 코드를 컴파일 후 실행하여

스레드를 수행하는데, 약 0.146 sec의 시간이 걸렸습니다. 의외로 세마포어를 사용한 것보다 시간이 더 걸렸습니다.

 

스레드의 개수를 더 증가시켜도,

세마포어로 구현한 배리어 프로그램의 결과가 조금 더 성능이 좋습니다.

 


+) POSIX barrier가 구현되어 있는 것을 확인했습니다. pthread_barrier_t와 관련된 함수들을 사용해 배리어를 구현할 수 있었습니다.

void* Thread_work(void* rank)
{
#ifdef DEBUG
    long my_rank = (long)rank;
#endif

    for (int i = 0; i < BARRIER_COUNT; i++) {
        pthread_barrier_wait(&barrier);
#ifdef DEBUG
        if (my_rank == 0) {
            printf("All threads completed barrier %d\n", i);
            fflush(stdout);
        }
#endif
    }

    return NULL;
}

간단하게 위와 같이 사용할 수 있는 것 같고, 실행도 잘 됩니다.

다만 아직 멀티스테이지에서의 예제를 발견하지 못해서 이게 확실한 방법인지 약간의 의심이 있습니다.

(barrier를 배열로 정의하여 각 스테이지별로 따로 사용해보고, 하나의 barrier로도 실행해봤지만 둘 다 정상적으로 스레드 함수가 돌아가는 것을 확인했습니다.

 

세마포어의 성능보다 아주 약간 좋은 것 같습니다.

 

코드는 아래 링크에서 확인 가능합니다.

https://github.com/junstar92/parallel_programming_study/blob/master/pthread/11_pth_posix_barrier.c

 

GitHub - junstar92/parallel_programming_study

Contribute to junstar92/parallel_programming_study development by creating an account on GitHub.

github.com

 

댓글