본문 바로가기
프로그래밍/C & C++

[C++] 비동기(Asynchronous) 실행

by 별준 2021. 8. 14.

References

Contents

  • std::future, std::promise
  • std::shared_future
  • std::packaged_task
  • std::async

앞선 글들에서 이야기했던 쓰레드나 생성자-소비자 패턴의 경우을 사용하는 경우는 결국 프로그램의 실행이 한 갈래가 아닌 여러 갈래로 갈라져서 동시에 진행되어서 CPU를 조금 더 효율적으로 사용하기 위해서입니다.

즉, 프로그램을 비동기적(asynchronous) 실행을 하기 위해서 입니다.

 

이번 글에서는 이러한 비동기적 실행을 간단하게 구현할 수 있도록 도와주는 도구/기능들에 대해서 알아보겠습니다.


std::future / std::promise

비동기적 실행을 통해서 하고 싶은 일은, 어떠한 데이터를 다른 쓰레드를 통해 처리하고, 그 처리된 데이터를 받아내는 것이라고 볼 수 있습니다.

 

즉, 처리해야되는 데이터를 미래에(future) 다시 돌려받겠다라는 약속(promise)라고 볼 수 있습니다.

이 것을 코드로 변환하면 다음과 같습니다.

#include <iostream>
#include <string>
#include <future>
#include <thread>

void worker(std::promise<std::string>* p) {
	// 약속을 이행함. 해당 결과는 future에 들어가게 됨
	p->set_value("some data");
}

int main(void) {
	std::promise<std::string> p;

	// 미래에 string 데이터를 받는다는 약속
	std::future<std::string> data = p.get_future();

	std::thread t(worker, &p);

	// 약속된 데이터를 받을 때까지 기다림
	data.wait();

	// wait가 리턴됬다는 것은 future에 데이터가 준비되었다는 의미
	// 참고로, wait없이 get만 사용해도 wait 한 것과 동일
	std::cout << "받은 데이터 : "  << data.get() << "\n";

	t.join();

	return 0;
}

컴파일 후 실행하면 다음의 출력 결과를 확인할 수 있습니다.

코드를 살펴보도록 합시다.

먼저, promise 객체는 다음과 같이 사용합니다.

std::promise<std::string> p;

promise 객체를 정의할 때에는 작업을 수행한 후에 돌려줄 객체의 타입을 템플릿 인자로 받고, 여기서는 string 객체를 돌려줄 것이므로 string을 전달했습니다. 

작업이 끝난 후에 promise 객체는 자신이 가지고 있는 future 객체에 값을 넣어주게 됩니다. 이때 자신이 가지고 있는 future 객체를 설정하는 것은

std::future<std::string> data = p.get_future();

위와 같이 get_future 함수를 통해서 설정할 수 있습니다. 다만, 이 상태로는 data가 아직은 promise가 작업 후에 돌려줄 결과는 포함하고 있지 않습니다. 

promise 객체의 작업 후에 data가 실제 결과를 전달받기 위해서는

p->set_value("some data");

위와 같이 promise 객체가 set_value를 통해서 자신이 가지고 있는 future 객체에 데이터를 전달해야합니다.

 

promise에 대응되는 future 객체는 promise가 전달한 데이터를 get 함수를 통해서 얻을 수 있습니다.

// 약속된 데이터를 받을 때까지 기다림
data.wait();

// wait가 리턴됬다는 것은 future에 데이터가 준비되었다는 의미
// 참고로, wait없이 get만 사용해도 wait 한 것과 동일
std::cout << "받은 데이터 : "  << data.get() << "\n";

여기서 중요한 것은 값을 전달받기 전까지 wait 함수를 통해서 기다린다는 것인데, wait 함수가 리턴한다면 그제서야 get 함수를 통해서 future에 전달된 객체(데이터)를 얻을 수 있습니다. 주석에 설명했듯이 굳이 wait 함수를 호출할 필요는 없습니다. get 함수만 사용하더라도 promise에서 future로 객체(데이터)를 전달할 때까지 기다리게 됩니다. 즉, 위 코드에서 data.wait()는 없어도 됩니다.

 

참고로 get 함수를 호출하게 되면 future 내에 전달받은 객체가 이동됩니다. 따라서 get을 다시 호출하면 안됩니다.

 

std::promise와 std::future를 정리하자면, promise는 생산자-소비자 패턴에서 생산자의 역할을 수행하고, future는 소비자의 역할을 수행한다고 볼 수 있습니다.

그렇기 때문에 이러한 코드는 아래처럼 condition_variable을 사용해도 구현할 수 있습니다.

#include <iostream>
#include <string>
#include <thread>
#include <condition_variable>
#include <mutex>

std::condition_variable cv;
std::mutex m;
bool done = false;
std::string info;

void worker() {
	{
		std::unique_lock<std::mutex> lk(m);
		info = "some data";
		done = true;
	}

	cv.notify_all();
}
int main(void) {
	std::thread t(worker);

	std::unique_lock<std::mutex> lk(m);
	cv.wait(lk, []() { return done; });
	lk.unlock();

	std::cout << "받은 데이터 : " << info << std::endl;

	return 0;
}

하지만, promise-future을 사용하는 것이 조금 더 깔끔하고 이해도 잘 되는 편인 것 같습니다. 

그리고 condition variable을 사용한 것보다 더 유용한 점은 바로 future에 예외도 전달할 수 있다는 것입니다.

include <iostream>
#include <string>
#include <future>
#include <thread>

void worker(std::promise<std::string>* p) {
	try {
		throw std::runtime_error("Some Error");
	}
	catch (...) {
		// 예외 세팅
		p->set_exception(std::current_exception());
	}
}

int main(void) {
	std::promise<std::string> p;

	// 미래에 string 데이터를 받는다는 약속
	std::future<std::string> data = p.get_future();

	std::thread t(worker, &p);

	// 약속된 데이터를 받을 때까지 기다림
	data.wait();

	try {
		std::cout << "받은 데이터 : " << data.get() << "\n";
	}
	catch (const std::exception& e) {
		std::cout << "예외 : " << e.what() << "\n";
	}
	t.join();

	return 0;
}

컴파일 후 실행하면,

예외가 잘 전달된 것을 볼 수 있습니다.

참고로 set_exception 함수는 예외 객체가 아닌 exception_ptr을 전달해야하는데, 이것은 catch로 받은 예외 객체의 포인터가 아닌 현재 catch된 예외에 관한 정보를 반환하는 current_exception 함수가 리턴하는 객체입니다.

catch로 전달받은 예외 객체를 make_exception_ptr 함수를 통해서도 가능합니다.

void worker(std::promise<std::string>* p) {
	try {
		throw std::runtime_error("Some Error");
	}
	catch (std::exception e) {
		// 예외 세팅
		p->set_exception(std::make_exception_ptr(e));
	}
}

 

wait for

위 예시에서는 wait 함수를 통해서 promise가 future로 데이터 객체를 전달할 때까지 기다리게 했습니다. 하지만 wait_for을 사용하면, 정해진 시간 동안만 기다리고 그냥 진행할 수 있습니다.

#include <iostream>
#include <string>
#include <future>
#include <thread>
#include <chrono>

void worker(std::promise<void>* p) {
	std::this_thread::sleep_for(std::chrono::seconds(10));
	p->set_value();
}

int main(void) {
	std::promise<void> p;
	std::future<void> data = p.get_future();

	std::thread t(worker, &p);

	while (true) {
		std::future_status status = data.wait_for(std::chrono::seconds(1));

		if (status == std::future_status::timeout) {
			// 아직 데이터가 준비되지 않음
			std::cerr << ">";
		}
		else if (status == std::future_status::ready) {
			// promise에서 future로 데이터 설정
			break;
		}
	}
	t.join();

	return 0;
}

위 코드를 실행시키면,

'>' 를 1초당 하나씩 출력하다가 10초 후에 종료됩니다.

wait_for 함수는 promise가 설정될 때까지 기다리는 대신 인자로 전달된 시간만큼 기다렸다가 바로 리턴합니다. 이 때, 리턴하는 값은 현재 future 상태를 나타내는 future_status 객체입니다.

 

future_status 객체는 총 3 가지 상태를 가질 수 있는데, 

- future에 값이 설정됬을 때는 future_status::ready,

- wait_for에 지정한 시간이 지났지만 값이 설정되지 않아서 리턴됬을 때는 future_status::timeout

- 결과값을 계산하는 함수가 실행되지 않았다는 의미인 future_status::deferred

가 있습니다.

 

그리고 여기서 promise와 future에 void 객체를 템플릿 인자로 전달했는데, void인 경우에 아무런 객체도 전달하지는 않지만, future가 set이 되었는지의 유무로 판단하는 플래그의 역할을 수행할 수 있습니다.

 


shared_future

위에서 future의 경우 한 번만 get을 할 수 있다고 했습니다. 이는 get을 호출하게 되면 future 내부의 객체가 이동되기 때문입니다. 하지만, 종종 다른 쓰레드에서 future를 get할 필요성이 있습니다.

 

이 경우에는 shared_future를 사용하면 가능합니다. 달리기를 C++ 프로그램으로 나타낸 예제 코드를 살펴보겠습니다.

#include <iostream>
#include <string>
#include <future>
#include <thread>

void runner(std::shared_future<void>* start) {
	start->get();
	std::cout << "출발!\n";
}

int main(void) {
	std::promise<void> p;
	std::shared_future<void> start = p.get_future();

	std::thread t1(runner, &start);
	std::thread t2(runner, &start);
	std::thread t3(runner, &start);
	std::thread t4(runner, &start);

	std::cerr << "준비...";
	std::this_thread::sleep_for(std::chrono::seconds(1));
	std::cerr << "땅!\n";

	p.set_value();

	t1.join();
	t2.join();
	t3.join();
	t4.join();

	return 0;
}

위와 같이 4개의 쓰레드에서 하나의 future값이 공통으로 사용하여, 위의 출력 결과를 확인할 수 있습니다.

condition_variable을 사용하게 동일하게 작성할 수도 있지만, future를 사용하는 것이 훨씬 편리할 것입니다.


std::packaged_task

C++에서 위에서 설명한 promise-future 패턴을 비동기적 함수(정확히는 Callable 함수; 람다 함수, 함수 객체)의 리턴값에 간단히 적용할 수 있는 packaged_task라는 것을 지원하고 있습니다.

packaged_task에 전달된 함수(task2)가 리턴할 때, 그 리턴값을 promise에 set_value 하고, 만약 예외를 던졌다면 set_execption을 합니다. promise에 설정된 future는 packaged_task가 리턴하는 future에 접근할 수 있습니다.

#include <iostream>
#include <future>
#include <thread>

int some_task(int x) {
	return 10 + x;
}

int main(void) {
	// int(int) -> return type(arguments) int를 리턴 / int를 인자로 받는 함수
	// std::function 참조
	std::packaged_task<int(int)> task(some_task);

	std::future<int> start = task.get_future();

	std::thread t(std::move(task), 5);

	std::cout << "결과값 : " << start.get() << std::endl;
	t.join();

	return 0;
}

컴파일 후 실행하면 아래의 출력 결과를 확인할 수 있습니다.

packaged_task는 비동기로 실행할 함수 자체를 생성자의 인자로 받습니다. 또한, 템플릿 인자로 해당 함수의 리턴 타입과 파라미터 타입도 명시해야 합니다. packaged_task는 전달된 함수를 실행하고, 그 함수의 리턴값을 promise에 설정합니다.

std::packaged_task<int(int)> task(some_task);

std::future<int> start = task.get_future();

그리고 해당 promise에 설정된 future는 위와 같이 get_future 함수를 통해서 설정할 수 있습니다.

 

생성된 packaged_task는 쓰레드에 전달하면 되는데, packaged_task는 복사 생성이 불가능합니다. 때문에 명시적으로 move를 해주어야만 합니다.(promise도 동일)

std::thread t(std::move(task), 5);

 

이렇게 비동기로 실행된 함수의 결과값은 추후에 future의 get 함수로 받을 수 있습니다. 이처럼 packaged_task를 사용하게 된다면 쓰레드에 굳이 promise를 전달하지 않아도 알아서 packaged_task가 함수의 리턴값을 처리해주어서 매우 편리하게 사용할 수 있습니다.


std::async

위에서 promise나 packaged_task는 비동기 실행을 위해서, 쓰레드를 명시적으로 생성하여 실행해야 했습니다.

하지만 std::async에 어떤 함수를 전달한다면, 아예 쓰레드를 알아서 만들어서 해당 함수를 비동기적으로 실행하고, 그 결과값을 future에 전달합니다.

 

아래 예시를 살펴보겠습니다.

#include <iostream>
#include <future>
#include <thread>
#include <vector>

int sum(const std::vector<int>& v, int start, int end) {
	int total = 0;
	for (int i = start; i < end; i++) {
		total += v[i];
	}

	return total;
}

int parallel_sum(const std::vector<int>& v) {
	// 1 ~ 500 까지의 합
	std::future<int> lower_half_future = 
		std::async(std::launch::async, sum, cref(v), 0, v.size() / 2);

	// 501부터 1000까지의 합
	int upper_half = sum(v, v.size() / 2, v.size());

	return lower_half_future.get() + upper_half;
}

int main(void) {
	std::vector<int> v;
	v.reserve(1000);
	for (int i = 0; i < 1000; i++) {
		v.push_back(i + 1);
	}

	std::cout << "1부터 1000까지의 합 : " << parallel_sum(v) << "\n";

	return 0;
}

우선 컴파일 후 실행하면 아래의 출력 결과를 볼 수 있습니다.

async 함수는 인자로 받은 함수를 비동기로 실행하고, 해당 결과값을 보관할 future를 리턴합니다.

std::future<int> lower_half_future = 
		std::async(std::launch::async, sum, cref(v), 0, v.size() / 2);

async의 첫번째 인자는 어떠한 형태로 실행할지를 전달하는데 아래 두 가지 값으로 설정 가능합니다. 두 번째인자부터는 차례로 함수, 그리고 함수의 인자를 전달하면 됩니다.

  • std::launch::async : 바로 쓰레드를 생성하여 인자로 전달된 함수를 실행
  • std::launch::deferred : future의 get 함수가 호출되었을 때 실행(새로운 쓰레드 생성하지 않음, 동기적 실행)

이렇게 생성된 async 함수는 실행하는 함수의 결과값을 포함하는 future를 리턴하게 되고, 그 결과값은 get 함수를 통해서 얻을 수 있습니다.

return lower_half_future.get() + upper_half;

위의 parrallel 함수는 1부터 1000까지의 덧셈을 2개의 쓰레드에서 실행하는데, 1부터 500까지의 합은 async를 통해 생성된 새로운 쓰레드에서 처리하고, 나머지는 원래의 쓰레드에서 처리하게 됩니다.

 

위의 경우 1~1000까지의 덧셈이기 때문에 비동기나 동기 실행의 차이가 크지 않지만, 다음의 예제를 살펴보면 큰 차이가 난다는 것을 알 수 있습니다. 3초가 걸리는 실행 함수를 비동기 실행과 동기 실행으로 수행 시간을 출력하는 예제입니다.

#include <iostream>
#include <future>
#include <thread>
#include <vector>
#include <chrono>

int do_work(int x) {
	// do something
	std::this_thread::sleep_for(std::chrono::seconds(3));
	return x;
}

void do_work_parallel() {
	auto f1 = std::async([]() { do_work(3); });
	auto f2 = std::async([]() { do_work(3); });
	do_work(3);

	f1.get();
	f2.get();
}

void do_work_sequential() {
	do_work(3);
	do_work(3);
	do_work(3);
}

int main(void) {
	std::cout << "----- Sequential Work -----\n";
	auto t0 = std::chrono::steady_clock::now();
	do_work_sequential();
	auto t1 = std::chrono::steady_clock::now();
	std::cout << "걸린 시간 : " << std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count() << " ms\n";


	std::cout << "----- Parallel Work -----\n";
	t0 = std::chrono::steady_clock::now();
	do_work_parallel();
	t1 = std::chrono::steady_clock::now();
	std::cout << "걸린 시간 : " << std::chrono::duration_cast<std::chrono::milliseconds>(t1 - t0).count() << " ms\n";

	return 0;
}

컴파일 후 실행해보면, 다음의 결과를 얻을 수 있습니다.

동기 실행은 약 9초, 비동기 실행은 같은 함수임에도 3초가 걸린 것을 확인할 수 있습니다.

 


이렇게 promise, future, packaged_task, async에 대해서 알아봤는데, C++에서 제공하는 편리한 기능들을 사용하면 mutex나 condition_variable을 사용하지 않고도 매우 편리하게 비동기적 작업을 수행할 수 있습니다.

'프로그래밍 > C & C++' 카테고리의 다른 글

[C++] string과 string_view  (0) 2022.02.06
C++ 라이브러리 개요  (0) 2022.02.06
[C++] Perfect Forwarding  (0) 2021.08.13
[C++] Move Semantics  (0) 2021.08.12
[C++] 우측값 참조(rvalue reference)  (0) 2021.08.11

댓글