Skip to content

12. FastAPI의 비동기 처리 방식


1. 테스트 환경 구성

1) Google Translation API: Blocking I/O

  • Google Translation API 클라이언트 라이브러리 및 FastAPI 디펜던시 설치


pip install google-cloud-translate==2.0.1 && \
  pip install gunicorn uvicorn[standard] fastapi[all]


  • 서비스 계정 키 파일 등록


export GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"


Google Translation API 클라이언트 기본 샘플 코드
def translate_text(target, text):
    """Translates text into the target language.

    Target must be an ISO 639-1 language code.
    See https://g.co/cloud/translate/v2/translate-reference#supported_languages
    """
    import six
    from google.cloud import translate_v2 as translate

    translate_client = translate.Client()

    if isinstance(text, six.binary_type):
        text = text.decode("utf-8")

    # Text can also be a sequence of strings, in which case this method
    # will return a sequence of results for each text.
    result = translate_client.translate(text, target_language=target)

    print(u"Text: {}".format(result["input"]))
    print(u"Translation: {}".format(result["translatedText"]))
    print(u"Detected source language: {}".format(result["detectedSourceLanguage"]))


  • 테스트용 코드에서는 매 요청마다 translate.Client() 로드 시간이 소요되기 때문에 FastAPI 앱 스태틱으로 수정함


2) Gunicorn 설정

  • worker_class = "uvicorn.workers.UvicornWorker”
  • workers = 1
  • timeout = 0


3) Apach Benchmarking 설정

  • Requests = 100
  • Concurrency = 50
  • src_lang = "ko"
  • 추가 정보: google.cloud.translate_v2에서 src_lang은 사용되지 않음
  • src_text = "안녕"
  • tgt_lang = "en"
  • 추가 정보: google.cloud.translate_v2에서 tgt_lang은 가변적으로 사용할 수 있음


2. FastAPI 테스트

1) async def 함수에서 def 함수 호출

  • async def Path operation 함수에서 def 함수인 Google Translation API 요청


from fastapi import FastAPI, Body, status
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, Field
from google.cloud import translate_v2 as translate
import six
import logging

def translate_text(translate_client, src_text, tgt_lang):
    if isinstance(src_text, six.binary_type):
        src_text = src_text.decode("utf-8")

    try:
        result = translate_client.translate(src_text, target_language=tgt_lang)
        translated = result["translatedText"]
    except:
        translated = ""

    return translated

class Data(BaseModel):
  src_lang: str = Field(..., min_length=2, max_length=5)
  src_text: str = Field(..., min_length=1, max_length=128)
  tgt_lang: str = Field(..., min_length=2, max_length=5)

logging.basicConfig(filename="./log.log", level=logging.DEBUG)
app = FastAPI()

@app.on_event("startup")
async def startup_event():
    app.google_client = translate.Client()

@app.on_event("shutdown")
async def shutdown_event():
    del app.google_client

@app.post("/", status_code=status.HTTP_200_OK)
async def main(*, data: Data = Body(...)):
    body = jsonable_encoder(data)
    src_lang = body["src_lang"]
    src_text = body["src_text"]
    tgt_lang = body["tgt_lang"]

    tgt_text = translate_text(app.google_client, src_text, tgt_lang)
    body.update({"tgt_text": tgt_text})

    return JSONResponse(content=jsonable_encoder(body))


  • Starting new HTTPS connections (1) 이외의 커넥션은 생성되지 않음
  • 즉, 처음 커넥션이 생성되면 첫 요청부터 마지막 요청까지 담당함
  • 이전 요청이 완료되면 동일한 커넥션에서 다음 요청-응답 프로세스 진행함
  • 결과적으로, Path operation 함수에 async 키워드를 붙인다고 해도 코드 내부에 Blocking I/O가 존재한다면 블로킹됨


  • 총 소요시간은 18.232초이며, 초당 처리 수는 5.48


2) async def 함수에서 run_in_threadpool() 메서드를 이용하여 def 함수 호출

  • async def Path operation 함수에서 run_in_threadpool() 메서드를 이용하여 def 함수인 Google Translation API 요청


from fastapi import FastAPI, Body, status
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, Field
from google.cloud import translate_v2 as translate
import six
import logging

from fastapi.concurrency import run_in_threadpool

def translate_text(translate_client, src_text, tgt_lang):
    if isinstance(src_text, six.binary_type):
        src_text = src_text.decode("utf-8")

    try:
        result = translate_client.translate(src_text, target_language=tgt_lang)
        translated = result["translatedText"]
    except:
        translated = ""

    return translated

class Data(BaseModel):
  src_lang: str = Field(..., min_length=2, max_length=5)
  src_text: str = Field(..., min_length=1, max_length=128)
  tgt_lang: str = Field(..., min_length=2, max_length=5)

logging.basicConfig(filename="./log.log", level=logging.DEBUG)
app = FastAPI()

@app.on_event("startup")
async def startup_event():
    app.google_client = translate.Client()

@app.on_event("shutdown")
async def shutdown_event():
    del app.google_client

@app.post("/", status_code=status.HTTP_200_OK)
async def main(*, data: Data = Body(...)):
    body = jsonable_encoder(data)
    src_lang = body["src_lang"]
    src_text = body["src_text"]
    tgt_lang = body["tgt_lang"]

    tgt_text = await run_in_threadpool(
        translate_text, app.google_client, src_text, tgt_lang
    )
    body.update({"tgt_text": tgt_text})

    return JSONResponse(content=jsonable_encoder(body))


  • 생성할 수 있는 커넥션 수(현재 40개 - 맨 아래 정리에서 이유 확인)까지 커넥션을 생성함
  • 이전 요청이 완료되지 않아도 다음 요청을 받아서 요청-응답 프로세스를 진행함
  • 결과적으로, Path operation 함수에 async 키워드를 붙이는 경우 코드 내부에 Blocking I/O를 비동기 처리(run_in_threadpool() 메서드 이용)해야 비동기 방식으로 작동함


  • 총 소요시간은 1.445초이며, 초당 처리 수는 69.21


3) def 함수에서 def 함수 호출

  • def Path operation 함수에서 def 함수인 Google Translation API 요청 코드


from fastapi import FastAPI, Body, status
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, Field
from google.cloud import translate_v2 as translate
import six
import logging

def translate_text(translate_client, src_text, tgt_lang):
    if isinstance(src_text, six.binary_type):
        src_text = src_text.decode("utf-8")

    try:
        result = translate_client.translate(src_text, target_language=tgt_lang)
        translated = result["translatedText"]
    except:
        translated = ""

    return translated

class Data(BaseModel):
  src_lang: str = Field(..., min_length=2, max_length=5)
  src_text: str = Field(..., min_length=1, max_length=128)
  tgt_lang: str = Field(..., min_length=2, max_length=5)

logging.basicConfig(filename="./log.log", level=logging.DEBUG)
app = FastAPI()

@app.on_event("startup")
async def startup_event():
    app.google_client = translate.Client()

@app.on_event("shutdown")
async def shutdown_event():
    del app.google_client

@app.post("/", status_code=status.HTTP_200_OK)
def main(*, data: Data = Body(...)):
    body = jsonable_encoder(data)
    src_lang = body["src_lang"]
    src_text = body["src_text"]
    tgt_lang = body["tgt_lang"]

    tgt_text = translate_text(app.google_client, src_text, tgt_lang)
    body.update({"tgt_text": tgt_text})

    return JSONResponse(content=jsonable_encoder(body))


  • 생성할 수 있는 커넥션 수(현재 40개)까지 커넥션을 생성함
  • 이전 요청이 완료되지 않아도 다음 요청을 받아서 요청-응답 프로세스를 진행함
  • 결과적으로, 코드 내부에 Blocking I/O를 비동기 처리하지 않아도 비동기 방식으로 작동하려면 Path operation 함수에 async 키워드를 붙이지 않아야 함


  • 총 소요시간은 2.361초이며, 초당 처리 수는 42.36


4) 테스트 요약

블로킹 총 소요시간 초당 처리 수
async def → def Yes 18.232 5.48
run_in_threadpool() No 1.445 69.21
def → def No 2.361 42.36


3. 정리

1) asyncio 라이브러리

  • Python에서 비동기 처리를 할 때는 asyncio를 이용함
  • asyncio는 CPU bound보다는 Blocking I/O를 처리하는 데 초점을 둠(CPU bound도 저수준 API를 이용하여 구현하면 async/await 가능함)
  • asyncio는 네이티브 코루틴(Coroutine) 개념을 이용하여 구현되었으며, 기본적인 동작 방식은 하나의 쓰레드가 마치 멀티 태스킹처럼 메인 루틴과 코루틴을 번갈아가며 코드를 실행함
  • async def 함수와 def 함수일 때 모두 비동기 처리를 할 수 있지만, 두 가지 상황에 대한 처리 방식이 다름


  • async def 함수일 때는 네이티브 코루틴의 개념을 이용하여 비동기 처리를 함
  • 이벤트 루프 안에 태스크를 넣어 일반적으로 모든 태스크가 끝날 때까지(중간에 끊는 경우도 있음) 이벤트 루프를 돌리는 것으로 구현함
  • async.sleep() 등과 같이 네이티브 코루틴으로 구현된 Blocking I/O는 await 할 수 있음


  • def 함수일 때는 프로세스 풀(Process pool) 또는 쓰레드 풀(Thread pool)을 이용하여 비동기 처리를 함
  • def 함수일 때는 await 할 수 없기 때문에 네이티브 코루틴 방식으로는 비동기 처리를 할 수 없음
  • 이런 이유로 프로세스/쓰레드 풀을 이용하는 것이며, Context switching을 통해 프로세스/쓰레드를 번갈아가며 코드를 실행하는 것으로 비동기 처리를 할 수 있는 것임


  • 즉, async def 함수는 멀티 태스킹 방식처럼 비동기 처리를 하는 것이고, def 함수는 멀티 프로세싱/쓰레딩 방식으로 비동기 처리를 하는 것임


  • def 함수의 비동기 처리는 프로세스/쓰레드 풀을 이용하여 해결한다고 했는데, Python GIL 때문에 멀티 쓰레딩을 사용하지 않는 것이 좋다고 생각할 수 있지만, GIL은 CPU bound에서 공유 자원에 대해 발생할 수 있는 Race condition 문제 때문에 필요한 것임
  • 그래서 def 함수의 비동기 처리 방식은 CPU bound일 경우 멀티 프로세싱 방식으로, Blocking I/O일 경우 멀티 쓰레딩 방식으로 구분해서 해결함
  • 이 때 사용하는 메서드가 asyncioloop.run_in_executor() 메서드이고, 호출 형태는 run_in_executor(executor, func, *args)
  • CPU bound인 경우 ProcessPoolExecutor를 생성하고, Blocking I/O인 경우 ThreadPoolExecutor(디폴트 executor)를 생성하여 run_in_executor() 메서드에 첫 번째 인자로 넣음


  • 다음은 asyncio 공식 문서에 있는 run_in_executor() 메서드 샘플 코드임
  • CPU bound일 때와 Blocking I/O를 구분해서 executor를 다르게 생성한 후 인자에 넣는 것을 확인할 수 있음


import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())


2) FastAPI 프레임워크

  • pip install uvicorn[standard]로 Uvicorn을 설치한 경우, CLI에서 uvicorn을 실행하면 uvloop라는 이벤트 루프를 생성한 상태가 됨
  • Uvicorn 코드를 확인해보면 uvloop 또한 asyncio의 추상 클래스를 상속받아 구현되었으며, run_in_executor()도 동일하게 구현되었음


  • 마찬가지로 uvloopasyncio의 비동기 처리 방식처럼 네이티브 코루틴으로 구현되어 있어야만 프로세스/쓰레드 풀을 사용하지 않고 비동기 처리를 할 수 있음
  • 그래서 def Path operation 함수에서 def 함수를 호출하는 경우, Uvicorn이 def Path operatoin 함수를 호출할 때 run_in_executor() 메서드에 넣어 실행하기 때문에 멀티 쓰레딩 방식(디폴트는 Blocking I/O)으로 비동기 처리가 되는 것임
  • async def Path operation 함수에서 def 함수를 호출하는 방식은 결국 async def Path operation 함수 안에서 이벤트 루프 또는 프로세스/쓰레드 풀 방식의 비동기 처리 코드가 구현되지 않았기 때문에 블로킹되는 것임


  • 위의 테스트용 코드에서 def 함수인 Google Translation API를 비동기로 호출하기 위해 run_in_threadpool()의 인자로 def 함수를 넣었음


from fastapi.concurrency import run_in_threadpool

...

tgt_text = await run_in_threadpool(
    translate_text, app.google_client, src_text, tgt_lang
)


  • asynciorun_in_executor() 메서드의 인자로 ThreadPoolExecutor를 넣어야 하는 것을 FastAPI에서는 fastapi.concurrency.run_in_threadpool() 메서드로 간단하게 해결할 수 있게 구현함(이상한 점이 run_in_processpool() 메서드는 없음 - 못찾았을 수도 있음)


  • 추가적으로 최대 커넥션 수(최대 쓰레드 수)는 40개였는데, 그 이유는 최대 워커(max_workers)가 None이거나 주어지지 않았다면, 최대 쓰레드 수는 기본값으로 프로세서 수 * 5를 사용함
  • 그래서 테스트 환경처럼 8코어 CPU 환경이라면 최대 쓰레드 수는 40개가 되는 것임


3) 결론

(1) 네이티브 코루틴을 구현할 수 있거나 써드 파티 라이브러리(aiohttp 등)를 이용하는 경우

1] CPU bound

async def Path operation 함수 → await CPU bound 함수()
from fastapi import FastAPI
import asyncio

...

async def cpu_bound():
    ...

@app.get("/")
async def main():
    await cpu_bound()
    ...

2] Blocking I/O

async def Path operation 함수 → await Blocking I/O 함수()
from fastapi import FastAPI
import asyncio

...

async def blocking_io():
    ...

@app.get("/")
async def main():
    await blocking_io()
    ...


(2) 네이티브 코루틴을 구현할 수 없는 경우

1] CPU bound

from fastapi import FastAPI

...

def cpu_bound():
    ...

@app.get("/")
def main():
    cpu_bound()
    ...
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI

...

def cpu_bound():
    ...

@app.get("/")
async def main():
    with ProcessPoolExecutor() as p_pool:
        await run_in_executor(p_pool, cpu_bound)
    ...

2] Blocking I/O

from fastapi import FastAPI

...

def blocking_io():
    ...

@app.get("/")
def main():
    blocking_io()
    ...
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI

...

def blocking_io():
    ...

@app.get("/")
async def main():
    with ThreadPoolExecutor() as t_pool:
        await run_in_executor(t_pool, blocking_io)
    ...

References