본문 바로가기
프로그래밍/Python

[Python] Futures

by 별준 2022. 3. 30.

References

  • Fluent Python

Contents

  • concurrent.futures 라이브러리
  • Future 객체
  • Blocking I/O와 GIL
  • Executor.map()

이번 포스팅에서는 concurrent.futures 라이브러리에 대해서 알아보려고 합니다. 이 라이브러리는 여러 독립적인 스레드를 생성하고 결과를 큐에 수집하는 간단한 패턴을 구현해줍니다. 이 패키지는 프로세스도 지원하기 때문에 연산 집약적인 태스크에서 유용합니다.

 

그리고 비동기 작업의 실행을 나타내는 객체인 "future"의 개념도 간단하게 살펴보겠습니다. 이 개념은 concurrent.futures뿐만 아니라 asyncio 패키지의 기반이 되기도 합니다.

 

우선 간단한 웹 다운로드 예제를 통해 살펴보도록 하겠습니다.

Concurrent Web Downloads

긴 지연 시간 동안 CPU 클럭을 낭비하지 않으면서 네트워크 입출력을 효율적으로 처리하려면 동시성(concurrency)를 이용해야 합니다. 프로그램은 네트워크에서 응답이 도착할 때까지 다른 일을 처리하는 것이 옳습니다.

 

이러한 포인트를 보여주기 위해서, 웹으로부터 20개 국가의 국기 이미지를 다운로드하는 간단한 프로그램을 3개 작성해보려고 합니다. 첫 번째 flags.py는 순차적으로 실행되며 이전 이미지를 내려받아서 저장한 후에 다음 이미지를 내려받습니다. 나머지 두 프로그램은 동시에 이미지들을 내려받습니다. 즉, 모든 이미지에 대해 동시에 요청하고, 응답이 도착하는 대로 파일에 저장합니다. flags_threadpool.py 스크립트는 concurrent.futures 패키지를 사용하고, flags_asyncio.py는 asyncio를 사용합니다.

 

A Sequential Download Script

별로 중요한 코드는 아니지만, 순차 프로그램으로 작성된 스크립트의 몇몇 함수가 동시성 스크립트에 사용되기 때문에 한 번 살펴보도록 하겠습니다.

# flags.py
import time
from pathlib import Path
from typing import Callable

import requests

POP20_CC = ('KR IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://fluentpython.com/data/flags'
DEST_DIR = Path('downloaded') # The directory with the flag images

# img 바이트를 DEST_DIR의 filename으로 저장
def save_flag(img: bytes, filename: str) -> None:
    (DEST_DIR / filename).write_bytes(img)

# 주어진 country code로 URL을 구축하고 requests를 사용해 이미지를 다운받음
# 응답은 바이너리 데이터를 반환함
def get_flag(cc: str) -> bytes:
    url = f'{BASE_URL}/{cc}/{cc}.gif'.lower()
    resp = requests.get(url)
    return resp.content

# loop를 돌며 알파벳 순으로 각 country code에 맞는 이미지를 다운받음(key function)
def download_many(cc_list: list[str]) -> int:
    for cc in sorted(cc_list):
        image = get_flag(cc)
        save_flag(image, f'{cc}.gif')
        print(cc, end=' ', flush=True)
    return len(cc_list)

# main 함수는 다운로드를 수행하는 함수를 인수로 받아서 호출됨
# 이 함수는 다른 스크립트에서도 사용됨
def main(downloader: Callable[[list[str]], int]) -> None:
    DEST_DIR.mkdir(exist_ok=True)
    t0 = time.perf_counter()
    count = downloader(POP20_CC)
    elapsed = time.perf_counter() - t0
    print(f'\n{count} downloads in {elapsed:.2f}s')

if __name__ == '__main__':
    main(download_many)

사실 별로 새로운 것이 없는 코드입니다. 이 스크립트를 실행하면 다음과 같은 결과를 확인하실 수 있습니다.

제 PC에서 순차 스크립트는 약 18.8초가 걸렸습니다.

 

Downloading with concurrent.futures

이번에는 concurrent.futures를 사용해서 구현된 스크립트를 살펴보겠습니다.

concurrent.futures 패키지의 주요 기능은 ThreadPoolExecutor와 ProcessPoolExecutor 클래스입니다. 이 클래스들은 Callable 객체를 서로 다른 스레드나 프로세스에서 실행할 수 있게 해주는 인터페이스를 구현합니다. 이 클래스들은 worker 스레드나 work 프로세스를 관리하는 풀과 작업을 분배하는 큐와 결과를 수집하는 큐를 관리합니다. 하지만 아주 고수준(high level)의 인터페이스를 구현하고 있어서 국기 이미지를 내려받는 간단한 프로그램을 구현할 때 내부 동작 과정을 상세히 알 필요는 없습니다.

 

flags_threadpool.py 스크립트는 ThreadPoolExecutor.map() 메소드를 사용해서 동시에 이미지를 내려받는 작업을 아주 간단하게 구현합니다.

# flags_threadpool.py
from concurrent import futures
# Reuse functions in flags.py
from flags import save_flag, get_flag, main

# 하나의 국기 이미지를 다운받는 함수. 각 worker가 이 함수를 수행함
def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    # context manager로서 ThreadPoolExecutor를 인스턴스화 합니다.
    # executor.__exit__() 메소드는 executor.shutdown(wait=True)를 호출하는데,
    # 이는 스레드가 완료될 때까지 블락시킵니다.
    with futures.ThreadPoolExecutor() as executor:
        # map 메소드는 내장 함수 map과 유사합니다.
        # 첫 번째 인수인 download_one 함수는 여러 스레드에서 동시에 호출됩니다.
        # map 메소드는 각 함수 호출에서 리턴되는 값들을 반복할 수 있는 제너레이터를 반환합니다.
        # 여기서는 country code를 반환
        res = executor.map(download_one, sorted(cc_list))
    
    return len(list(res))

if __name__ == '__main__':
    # 이 스크립트에서 구현한 downlaod_many 콜러블을 인수로 전달하여,
    # flags.py에서 구현한 main 함수 호출
    main(download_many)

참고로 download_one 함수는 순차 프로그램 flags.py의 download_many()의 for문의 바디와 같습니다.

이렇게 작성한 flags_threadpool.py 스크립트 코드는 매우 짧은데, 이처럼 레거시 시퀀스 코드를 재사용하여 동시 실행을 간단히 추가할 수 있습니다.

 

위 스크립트를 실행한 결과는 다음과 같습니다.

1.93초로 약 10배 가량 빨라졌습니다.

 

ThreadPoolExecutor 생성자는 여기서는 사용하지 않았지만 여러 인수들을 받습니다. 그중 첫 번째에 위치하면서 중요한 인수 중 하나인 max_workers가 있습니다. 이 인수는 worker 스레드로 실행할 최대 갯수를 설정합니다. 파이썬 3.4까지는 max_workers가 필수였지만, 3.5부터는 옵셔널이 되었으며 기본값은 None입니다. max_workers가 None일 때 ThreadPoolExecutor는 다음의 표현식을 사용하여 이 값을 결정합니다(since Python 3.8).

max_workers = min(32, os.cpu_count() + 4)

계산된 기본값의 max_workers도 괜찮지만, ThreadPoolExecutor는 불필요한 새로운 worker를 실행하지 않도록 할 수 있습니다.

 

 

Where Are the Futures ?

라이브러리의 이름이 concurrent.futures 인데, flags_threadpool.py 코드에서는 Future을 볼 수 없습니다. Future는 concurrent.futures와 asyncio의 내부에 있는 핵심 컴포넌트인데, 이 라이브러리의 사용자에게 드러나지 않는 경우가 종종 있습니다. 위의 예제 코드에서는 암묵적으로 Future를 사용하지만, 코드에서 직접 건들이지는 않습니다.

 

파이썬 3.4 표준 라이브러리에서 Future라는 이름을 가진 클래스는 concurrent.futures.Future와 asyncio.Future입니다. 이 두 Future 클래스의 객체는 완료되었을 수도 있고 아닐 수도 있는 지연된 계산을 표현하기 위해 사용됩니다. Future 클래스는 Twisted의 Deferred 클래스, Tornado의 Future 클래스, 자바스크립트 라이브러리의 Promise 객체와 비슷합니다.

 

Future는 대기 중인 작업을 큐에 넣고, 완료 상태를 조사하고, 결과(혹은 예외)를 가져올 수 있도록 캡슐화합니다.

 

일반적으로 Future에 대해 알아야할 중요한 점은 직접 객체를 생성하면 안 된다는 것입니다. Future 객체는 concurrent.futures나 asyncio와 같은 동시성 프레임워크에서만 배타적으로 생성해야 합니다. 이유는 간단합니다. Future는 앞으로 일어날 일을 나타내고, Future의 실행을 스케쥴링하는 프레임워크만이 어떤 일이 일어날지 확실히 알 수 있기 때문입니다. 따라서 concurrent.futures.Future 객체는 concurrent.futures.Executor의 서브클래스로 실행을 스케쥴링한 후에만 생성됩니다. 예를 들어, Executor.submit() 메소드는 Callable을 받아서, 이 Callable의 실행을 스케쥴링하고 Future 객체를 반환합니다.

 

클라이언트 코드는 Future 객체의 상태를 직접 변경하면 안됩니다. Future 객체가 나타내는 연산이 완료되었을 때, 동시성 프레임워크가 Future 객체의 상태를 변경하기 때문입니다. 우리는 이 객체의 상태가 언제 바뀔지 제어할 수 없습니다.

 

두 프레임워크의 Future 클래스는 논블로킹(non-blockin)이며 이 객체에 연결된 Callable의 실행이 완료되었는지 여부를 boolean형으로 반환하는 done() 메소드가 있습니다. 일반적으로 클라이언트 코드는 Future가 완료되었는지 직접 물어보지 않고, 통지해달라고 요청합니다. 그렇기 때문에 이 두 Future 클래스에 add_done_callback() 메소드가 있습니다. 이 메소드에 하나의 인수를 받는 callable을 전달하면, Future 객체의 작업이 완료되었을 때 이 callable이 호출되면서 Future 객체를 인수로 받습니다.

 

이 두 프레임워크의 Future 클래스는 result() 메소드도 가지고 있는데, 완료된 경우 둘 다 Callable의 결과를 반환하거나, Callable이 실행될 때 발생한 예외를 다시 발생시킵니다. 그러나 Future 객체의 실행이 완료되지 않았을 때는 이 두 프레임워크의 result() 메소드 작동이 완전히 다릅니다.

concurrent.futures.Future 객체의 경우, f.result()는 결과가 나올 때까지 호출자의 스레드를 블로킹합니다. 선택적으로 timeout 인수를 전달할 수 있으며, 지정한 시간까지 Future 객체의 작업이 완료되지 않으면, TimeoutError 예외가 발생합니다. 그러나 asyncio.Future.result()는 타임아웃을 지원하지 않으며 yield from을 사용해서 객체의 상태를 가져오는 방법을 선호합니다. concurrent.futures.Future는 yield from을 사용할 수 없습니다.

 

두 라이브러리에는 Future 객체를 반환하는 함수가 많이 있습니다. 나머지 함수들은 사용자에게 보이지 않도록 내부에서 Future 객체를 사용합니다. 위의 예제 코드에서 본 Executor.map() 메소드는 내부에서 Future 객체를 사용하는 예입니다. Executor.map()이 반환하는 반복형 객체는 __next__() 메소드가 호출될 때마다 Future 객체의 result() 메소드를 호출하므로, Future 객체 자제가 아니라 Future 객체의 결과를 가져올 수 있게 해줍니다.

 

Future 객체를 실제로 사용해보기 위해서 위의 예제 코드를 concurrent.futures.as_completed() 함수를 사용하도록 수정해보도록 하겠습니다. as_completed() 함수는 Future 객체를 담은 반복형을 인수로 받아, 완료된 Future 객체를 생성하는 이터레이터를 반환합니다.

 

as_completed()를 사용하려면 download_many() 함수만 변경해주면 됩니다. 상위 수준의 executor.map() 메소드는 Future 객체를 생성하고 스케쥴링하는 루프와 Future 객체의 결과를 가져오는 루프로 분할됩니다. 코드를 수정하면서 print()문을 추가해서 완료 전후의 Future 객체를 출력하도록 해보겠습니다. 수정한 코드는 다음과 같습니다.

# flags_threadpool_futures.py
from concurrent import futures
# Reuse functions in flags.py
from flags import save_flag, get_flag, main

# 하나의 국기 이미지를 다운받는 함수. 각 worker가 이 함수를 수행함
def download_one(cc: str):
    image = get_flag(cc)
    save_flag(image, f'{cc}.gif')
    print(cc, end=' ', flush=True)
    return cc

def download_many(cc_list: list[str]) -> int:
    cc_list = cc_list[:5] # 5개의 나라만 사용
    # 대기 중인 Future를 살펴보기 위해 max_workers를 3으로 설정
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do: list[futures.Future] = []
        for cc in sorted(cc_list):
            # submit은 콜러블이 실행되도록 스케쥴링하고 이 작업을 나타내는 Future를 반환한다
            future = executor.submit(download_one, cc)
            to_do.append(future)
            print(f'Scheduled for {cc}: {future}')
        
        # as_completed()는 Future가 완료될 때 해당 Future 객체를 생성
        for count, future in enumerate(futures.as_completed(to_do), 1):
            res: str = future.result()
            print(f'{future} result: {res!r}')
    
    return count

if __name__ == '__main__':
    # 이 스크립트에서 구현한 downlaod_many 콜러블을 인수로 전달하여,
    # flags.py에서 구현한 main 함수 호출
    main(download_many)

download_many() 함수의 길이가 길어졌지만, 이제 Future 객체를 직접 볼 수 있습니다. 나머지 함수는 이전과 동일합니다.

 

여기서는 as_completed()가 반환한 완료된 Future 객체를 사용하므로 future.result()가 절대 블로킹되지 않습니다. 위 코드를 실행하면 다음과 같은 결과가 나옵니다.

처음 다섯 줄의 출력을 보면 Future 객체가 알파벳순으로 스케쥴링된 것을 확인할 수 있습니다. 그리고 Future 객체의 repr() 메소드가 상태를 보여주는데, 처음 3개만 실행 중이며, 이는 worker 스레드 수를 최대 3개로 설정했기 때문입니다. 따라서 마지막 2개의 Future 객체는 pending 중으로, worker 스레드를 기다리고 있습니다. 그 다음 다섯 줄의 출력에서 앞에 두 글자(국가)는 download_one 에서 출력한 것이며 그 뒤는 download_many에서 출력한 것입니다. 뒤의 다섯 줄의 출력은 정렬되지 않은 상태인 것을 확인할 수 있으며 그 순서는 실행할 때마다 바뀝니다.

 

몇 번 다시 실행하다보면, 다음과 같이 출력될 때도 있습니다.

처음 다섯 줄의 출력 이후의 출력을 보면 메인 스레드의 download_many()에서 BR의 결과를 출력하기 전에 BR과 IN 스레드가 국가 코드를 먼저 출력합니다.

 

사실 엄밀히 말하자면, 지금까지 테스트한 동시성 스크립트는 어느 것도 병렬로 내려받을 수 없습니다. 이는 concurrent.futures는 GIL에 의해 제한되기 때문입니다.

그렇다면 방금 수행한 간략한 성능 측정에 대해 의문이 생길 수 있습니다. 파이썬 스레드가 한 번에 한 스레드만 실행할 수 있게 강제하는 GIL에 의해 제한되는데, 동시성 스크립트의 성능이 순차 스크립트보다 10배가량 빠른데, 어떻게 이렇게 실행될 수 있을까요?

(심지어 asyncio를 사용한 버전 또한 순차 스크립트보다 더 빠른데, 둘 다 단일 스레드로 돌아갑니다. 이에 대한 내용은 다음 포스팅에서 살펴보겠습니다.)

 


Blocking I/O와 GIL

CPython 인터프리터는 내부적으로 thread-safe하지 않으므로, 전역 인터프리터 락(GIL)을 가지고 있습니다. GIL은 한 번에 한 스레드만 파이썬 바이트코드를 실행하도록 제한합니다. 그렇기 때문에 단일 파이썬 프로세스가 동시에 다중 CPU 코어를 사용할 수 없습니다. 이 제한은 CPython 인터프리터의 제한이지, 파이썬 언저 자체의 제한은 아닙니다. Jythone이나 IronPython은 이런 제약이 없습니다. 하지만 가장 빠른 인터프리터인 PyPy도 GIL을 가지고 있습니다.

 

파이썬 코드를 작성할 때 우리는 GIL을 제어할 수 없지만, 내장 함수나 C로 작성된 익스텐션은 시간이 오래 걸리는 작업을 실행할 때 GIL을 해제할 수 있습니다. 사실 C로 작성된 파이썬 라이브러리는 GIL을 관리하고, 자신의 OS 스레드를 생성해서 가용한 CPU 코드를 모두 사용할 수 있습니다. 하지만 라이브러리 코드가 상당히 복잡해지므로, 대부분의 라이브러리 제작자는 이런 방식으로 구현하지 않습니다.

 

하지만 블로킹 입출력을 실행하는 모든 표준 라이브러리 함수는 OS에서 결과를 기다리는 동안 GIL을 해제합니다. 즉, 입출력 위주의 작업을 실행하는 파이썬 프로그램은 파이썬으로 구현하더라도 스레드를 이용함으로써 이득을 볼 수 있다는 것입니다. 파이썬 쓰레드가 네트워크로부터의 응답을 기다리는 동안, 블로킹된 입출력 함수가 GIL을 해제함으로써 다른 스레드가 실행될 수 있습니다.

 

파이썬 표준 라이브러리의 모든 블로킹 입출력 함수는 GIL을 해제해서 다른 스레드가 실행할 수 있게 해줍니다. time.sleep() 함수도 GIL을 해제합니다. 따라서 GIL을 사용하고 있더라도 파이썬 쓰레드는 입출력 위주의 어플리케이션에서는 엄청난 효용성이 있습니다.

 


Launching Processes with concurrent.futures

이번에는 concurrent.futures를 사용를 사용해서 계산 위주의 작업에서 GIL을 우회하는 방법을 살펴보겠습니다.

 

concurret.futures의 문서(link)의 부제는 'Launching parallel tasks'입니다. 이 패키지는 ProcessPoolExecutor 클래스를 사용해서 작업을 여러 파이썬 프로세스에 분산시켜 진정한 병렬 컴퓨팅을 가능하게 합니다. ProcessPoolExecutor는 GIL을 우회하므로 계산 위주의 작업을 수행해야 하는 경우 사용 가능한 CPU를 모두 사용합니다.

 

ProcessPoolExecutor와 ThreadPoolExecutor는 모두 범용 Executor 인터페이스를 구현하므로, concurrent.futures를 사용하는 경우에는 스레드 기반의 프로그램을 프로세스 기반의 프로그램으로 쉽게 변환할 수 있습니다.

 

위에서 살펴본 국기 이미지를 내려받는 프로그램처럼 입출력 위주의 작업에서는 ProcessPoolExecutor를 사용해도 큰 효과는 없습니다. download_many() 함수에서 ThreadPoolExecutor를 ProcessPoolExecutor로 바꿔서 실행해보면 이 사실을 쉽게 확인할 수 있습니다.

 

ProcessPoolExecutor 또한 max_workers 파라미터가 있지만 기본값은 None 입니다. 이 경우 worker의 갯수는 os.cpu_count()가 반환하는 값으로 설정됩니다.

 

Multi-core Prime Checker Redux

[Python] Concurrency Models

이전 포스팅에서 소수 판별 코드를 multiprocessing 모듈을 사용해서 구현(procs.py)했었습니다. 이번에는 같은 프로그램을 ProcessPoolExecutor를 사용해서 작성해보도록 하겠습니다. primes.py의 코드는 이전 포스팅을 참조바랍니다.

 

# proc_pool.py
import sys
from concurrent import futures
from time import perf_counter
from typing import NamedTuple

from primes import is_prime, NUMBERS

class PrimeResult(NamedTuple):
    n: int
    flag: bool
    elapsed: float

def check(n: int) -> PrimeResult:
    t0 = perf_counter()
    res = is_prime(n)
    return PrimeResult(n, res, perf_counter() - t0)

def main() -> None:
    if len(sys.argv) < 2:
        workers = None
    else:
        workers = int(sys.argv[1])
    
    # 실제 worker의 개수를 확인하기 위해서 with 블록 전에 ProcessPoolExecutor 생성
    executor = futures.ProcessPoolExecutor(workers)
    actual_workers = executor._max_workers

    print(f'Checking {len(NUMBERS)} numbers with {actual_workers} processes:')

    t0 = perf_counter()

    numbers = sorted(NUMBERS, reverse=True) # 내림차순 정렬
    with executor: # context manager로 executor 사용
        # executor.map() 호출은 PrimeResult 인스턴스를 반환
        for n, prime, elapsed in executor.map(check, numbers):
            label = 'P' if prime else ' '
            print(f'{n:16}  {label} {elapsed:9.6f}s')
    
    time = perf_counter() - t0
    print(f'Total time: {time:.2f}s')

if __name__ == '__main__':
    main()

위 스크립트를 실행하면 다음의 결과를 확인할 수 있습니다.

실행해보시면 알겠지만, 첫 번째 줄의 결과는 아주 빠르게 출력되지만 두 번째 줄의 결과는 출력되는데 거의 8.6초 정도 걸립니다. 그리고 나머지 결과들은 두 번째 줄의 결과가 출력되는 즉시 거의 바로 출력됩니다.

 

proc_pool.py 스크립트가 이렇게 동작하는 이유는 다음과 같습니다.

  • executor.map(check, numbers)는 주어진 numbers와 같은 순서로 결과를 반환합니다.
  • 기본적으로 proc_pool.py는 CPU 개수만큼의 worker를 사용합니다. 제 PC의 경우에는 12개의 프로세스가 사용되었습니다.
  • numbers를 내림차순으로 전달했기 때문에, 첫 번째 숫자는 9로 나누어지므로 빠르게 리턴됩니다.
  • 두 번째 숫자는 샘플에서 가장 큰 소수입니다. 따라서 이 숫자는 소수를 판별하는데 다른 모든 숫자들보다 더 오래 걸립니다.
  • 반면 나머지 11개의 프로세스 나머지 숫자들의 소수 여부를 판별합니다.
  • 두 번째 숫자의 판별이 완료될 때, 다른 모든 프로세스들도 나머지 작업들을 끝낸 상태이기 때문에 결과는 이후에 거의 즉시 출력됩니다.

 


Experimenting with Executor.map

동시 프로그램의 동작을 더 쉽게 이해하기 위해서 Executor.map의 동작을 시각화해서 살펴보도록 하겠습니다.

위에서 살펴본 것처럼 Executor.map() 메소드를 이용하면 여러 콜러블을 아주 간단히 동시에 실행시킬 수 있습니다. 아래의 예제 스크립트는 Executor.map()이 작동하는 과정을 자세히 보여줍니다.

# demo_executor_map.py
from time import sleep, strftime
from concurrent import futures

# 이 함수는 자신이 받은 인수 앞에 [HH:MM:SS] 포맷의 타임스탬프를 찍어서 출력
def display(*args):
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)

# loiter() 함수는 단지 시작할 때 메세지를 출력하고, 인수로 받은 n초동안 sleep하고,
# 마지막 메세지를 출력. 메세지 앞에는 n개의 탭을 붙여 들여씀
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n*10 # 결과를 가져오는 방법을 보여주기 위해 n*10을 반환

def main():
    display('Script starting.')
    # 3개의 스레드를 가진 ThreadPoolExecutor 객체를 생성
    executor = futures.ThreadPoolExecutor(max_workers=3)
    # executor에 5개의 작업을 요청. 스레드가 3개뿐이므로
    # 일단은 loiter(0), loiter(1), loiter(2) 작업만 먼저 실행. map() 메소드는 논블로킹 메소드임
    results = executor.map(loiter, range(5))
    # executor.map()이 반환한 값을 바로 출력함. 출력 결과를 보면 알겠지만, map()은 제너레이터를 반환함
    display('results:', results)
    display('Waiting for individual results:')
    # for 루프 내에서 enumerate()를 호출하면 암묵적으로 next(results)를 호출하는데,
    # next(results)는 먼저 내부적으로 첫 번째 호출한 loiter(0)을 나타내는
    # Future 객체 _f의 result() 메소드를 호출한다. _f.result() 메소드는 _f가 완료될 때까지 블로킹되므로,
    # 다음 결과가 나올 때까지 이 루프는 블로킹된다.
    for i, result in enumerate(results):
        display(f'result {i}: {result}')

if __name__ == '__main__':
    main()

위 스크립트를 실행한 결과는 다음과 같습니다.

코드가 실행된 시간은 21:50:48입니다. 첫 번째 스레드가 loiter(0)을 실행하면, 이 함수는 0초간 sleep하고 두 번째 스레드가 시작되기 전에 반환할 수도 있지만 경우에 따라 달라질 수 있습니다. 그리고, 스레드 풀에는 스레드가 3개 있으므로 loiter(1)과 loiter(2)는 바로 실행됩니다.

 

line 26의 출력 결과를 보면, executor.map() 메소드가 반환한 값이 제너레이터임을 알 수 있습니다. 작업의 수와 max_workers 설정에 상관없이 여기까지는 전혀 블로킹되지 않고 실행됩니다. 그리고 for문에서 loiter()를 호출할 때 전달한 인수에 따라 실행이 블로킹될 수 있습니다. results 제너레이터의 __next__() 메소드는 첫 번째 Future 객체가 완료될 때까지 대기해야 합니다. 여기서는 이 루프를 시작하기 전에 loiter(0)이 완료되었으므로 블록되지 않습니다. 여기까지는 모두 시작 시간인 21:50:48에 발생합니다.

그리고 loiter(1)은 1초 후인 21:50:49에 완료됩니다. 그럼 이제 이 스레드는 loiter(4)를 실행하게 됩니다. 이후 loiter(1)의 결과인 10이 출력되고, for 루프는 loiter(2)의 결과를 기다리면서 블로킹될 것입니다.

이러한 형태가 반복되면서 loiter(2)가 완료되어 결과를 출력하고, loiter(3)도 마찬가지입니다.

loiter(4)가 완료되기까지는 2초가 걸리는데, 이 함수는 21:50:49에 시작되어 4초간 sleep되었기 때문입니다.

 

Executor.map()은 사용하기 쉽지만, 호출한 순서 그대로 결과를 반환한다는 특징이 있습니다. 이러한 특징은 상황에 따라 도움이 되기도 하고 아닐 수도 있습니다. 첫 번째 호출이 결과를 생성할 때까지 10초가 걸리고 나머지 호출은 1초씩 걸린다면, map()이 반환한 제너레이터가 첫 번째 결과를 가져오기까지 10초가 걸립니다. 그 후 다른 함수는 이미 실행을 완료했을테니 나머지 결과는 바로 가져올 수 있습니다. 더 진행하기 전에 모든 결과가 필요한 경우라면 이 특징은 문제가 되지 않지만, submit()한 순서와 상관없이 완료되는 대로 결과를 가져오는 것이 더 좋은 경우도 있습니다.

 

완료되는 대로 결과를 가져오려면 위에서 본 Executor.submit() 메소드와 futures.as_completed() 함수를 사용해야 합니다.

submit()이 다양한 콜러블과 인수를 제출할 수 있는 반면 executor.map()은 여러 인수에 동일한 콜러블을 실행하도록 설계되어 있으므로, executor.submit()/futures.as_completed() 조합이 executor.map()보다 융통성이 높습니다. 게다가 일부는 ThreadPoolExecutor 객체에서, 다른 일부는 ProcessPoolExecutor 객체에서 가져오는 등 여러 executor에서 가져온 Future 객체의 집합을 futures.as_completed()에 전달할 수 있습니다.

 

 


파이썬 쓰레드는 입출력 위주의 어플리케이션에 잘 맞으며, 경우에 따라 concurrent.futures 패키지를 이용하면 아주 간단히 처리할 수 있습니다.

 

하지만 ThreadPoolExecutor나 ProcessPoolExecutor가 적절히 처리할 수 없는 경우가 있습니다.

 

파이썬은 0.9.8 버전(1993년)부터 스레드를 지원했습니다. concurrent.futures는 단지 스레드를 사용하기 위해 나중에 추가된 방법일 뿐입니다. 파이썬 3에서는 원래의 thread 모듈의 사용 중단을 안내했으며 더 높은 수준의 threading 모듈의 사용을 권장하고 있습니다. futures.ThreadPoolExecutor로 처리하기 어려운 작업을 수행하는 경우 Thread, Lock, Semaphore 등 threading 모듈의 기본 컴포넌트를 이용해서 처리할 수 있습니다. 스레드 간에 데이터를 전송할 때는 queue 모듈에서 제공하는 thread-safe한 큐를 사용할 수 있습니다. 이 컴포넌트들은 futures.TreadPoolExecutor에 캡슐화되어 있습니다.

 

계산 위주의 작업을 수행하는 경우에는 여러 프로세스를 실행해서 GIL을 피해야 합니다. futures.ProcessPoolExecutor를 사용하면 간단히 처리할 수 있지만, 어플리케이션의 구조가 이 클래스와 잘 맞지 않는 경우에는, threading API와 비슷하지만 작업을 여러 프로세스에 할당하는 multiprocessing 패키지를 사용해야 합니다. 간단한 프로그램이라면 약간만 수정해도 threading 대신 multiprocessing을 사용할 수 있습니다. 협업이 필요한 프로세스들은 데이터 공유라는 큰 문제를 해결해야 하는데, multiprocessing 패키지에는 이 문제를 쉽게 해결해주는 장치도 있습니다.

댓글