12. FastAPI의 비동기 처리 방식
1. 테스트 환경 구성
1) Google Translation API: Blocking I/O
- Google Translation API 클라이언트 라이브러리 및 FastAPI 디펜던시 설치
- 서비스 계정 키 파일 등록
Google Translation API 클라이언트 기본 샘플 코드
def translate_text(target, text):
"""Translates text into the target language.
Target must be an ISO 639-1 language code.
See https://g.co/cloud/translate/v2/translate-reference#supported_languages
import six
from google.cloud import translate_v2 as translate
translate_client = translate.Client()
if isinstance(text, six.binary_type):
text = text.decode("utf-8")
# Text can also be a sequence of strings, in which case this method
# will return a sequence of results for each text.
result = translate_client.translate(text, target_language=target)
print(u"Text: {}".format(result["input"]))
print(u"Translation: {}".format(result["translatedText"]))
print(u"Detected source language: {}".format(result["detectedSourceLanguage"]))
- 테스트용 코드에서는 매 요청마다
로드 시간이 소요되기 때문에 FastAPI 앱 스태틱으로 수정함
2) Gunicorn 설정
worker_class = "uvicorn.workers.UvicornWorker”
workers = 1
timeout = 0
3) Apach Benchmarking 설정
Requests = 100
Concurrency = 50
src_lang = "ko"
- 추가 정보:
은 사용되지 않음 src_text = "안녕"
tgt_lang = "en"
- 추가 정보:
은 가변적으로 사용할 수 있음
2. FastAPI 테스트
1) async def
함수에서 def
함수 호출
async def
Path operation 함수에서def
함수인 Google Translation API 요청
from fastapi import FastAPI, Body, status
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, Field
from google.cloud import translate_v2 as translate
import six
import logging
def translate_text(translate_client, src_text, tgt_lang):
if isinstance(src_text, six.binary_type):
src_text = src_text.decode("utf-8")
result = translate_client.translate(src_text, target_language=tgt_lang)
translated = result["translatedText"]
translated = ""
return translated
class Data(BaseModel):
src_lang: str = Field(..., min_length=2, max_length=5)
src_text: str = Field(..., min_length=1, max_length=128)
tgt_lang: str = Field(..., min_length=2, max_length=5)
logging.basicConfig(filename="./log.log", level=logging.DEBUG)
app = FastAPI()
async def startup_event():
app.google_client = translate.Client()
async def shutdown_event():
del app.google_client
@app.post("/", status_code=status.HTTP_200_OK)
async def main(*, data: Data = Body(...)):
body = jsonable_encoder(data)
src_lang = body["src_lang"]
src_text = body["src_text"]
tgt_lang = body["tgt_lang"]
tgt_text = translate_text(app.google_client, src_text, tgt_lang)
body.update({"tgt_text": tgt_text})
return JSONResponse(content=jsonable_encoder(body))
Starting new HTTPS connections (1)
이외의 커넥션은 생성되지 않음- 즉, 처음 커넥션이 생성되면 첫 요청부터 마지막 요청까지 담당함
- 이전 요청이 완료되면 동일한 커넥션에서 다음 요청-응답 프로세스 진행함
- 결과적으로, Path operation 함수에
키워드를 붙인다고 해도 코드 내부에 Blocking I/O가 존재한다면 블로킹됨
- 총 소요시간은
초이며, 초당 처리 수는5.48
2) async def
함수에서 run_in_threadpool()
메서드를 이용하여 def
함수 호출
async def
Path operation 함수에서run_in_threadpool()
메서드를 이용하여def
함수인 Google Translation API 요청
from fastapi import FastAPI, Body, status
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, Field
from google.cloud import translate_v2 as translate
import six
import logging
from fastapi.concurrency import run_in_threadpool
def translate_text(translate_client, src_text, tgt_lang):
if isinstance(src_text, six.binary_type):
src_text = src_text.decode("utf-8")
result = translate_client.translate(src_text, target_language=tgt_lang)
translated = result["translatedText"]
translated = ""
return translated
class Data(BaseModel):
src_lang: str = Field(..., min_length=2, max_length=5)
src_text: str = Field(..., min_length=1, max_length=128)
tgt_lang: str = Field(..., min_length=2, max_length=5)
logging.basicConfig(filename="./log.log", level=logging.DEBUG)
app = FastAPI()
async def startup_event():
app.google_client = translate.Client()
async def shutdown_event():
del app.google_client
@app.post("/", status_code=status.HTTP_200_OK)
async def main(*, data: Data = Body(...)):
body = jsonable_encoder(data)
src_lang = body["src_lang"]
src_text = body["src_text"]
tgt_lang = body["tgt_lang"]
tgt_text = await run_in_threadpool(
translate_text, app.google_client, src_text, tgt_lang
body.update({"tgt_text": tgt_text})
return JSONResponse(content=jsonable_encoder(body))
- 생성할 수 있는 커넥션 수(현재
개 - 맨 아래 정리에서 이유 확인)까지 커넥션을 생성함 - 이전 요청이 완료되지 않아도 다음 요청을 받아서 요청-응답 프로세스를 진행함
- 결과적으로, Path operation 함수에
키워드를 붙이는 경우 코드 내부에 Blocking I/O를 비동기 처리(run_in_threadpool()
메서드 이용)해야 비동기 방식으로 작동함
- 총 소요시간은
초이며, 초당 처리 수는69.21
3) def
함수에서 def
함수 호출
Path operation 함수에서def
함수인 Google Translation API 요청 코드
from fastapi import FastAPI, Body, status
from fastapi.responses import JSONResponse
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel, Field
from google.cloud import translate_v2 as translate
import six
import logging
def translate_text(translate_client, src_text, tgt_lang):
if isinstance(src_text, six.binary_type):
src_text = src_text.decode("utf-8")
result = translate_client.translate(src_text, target_language=tgt_lang)
translated = result["translatedText"]
translated = ""
return translated
class Data(BaseModel):
src_lang: str = Field(..., min_length=2, max_length=5)
src_text: str = Field(..., min_length=1, max_length=128)
tgt_lang: str = Field(..., min_length=2, max_length=5)
logging.basicConfig(filename="./log.log", level=logging.DEBUG)
app = FastAPI()
async def startup_event():
app.google_client = translate.Client()
async def shutdown_event():
del app.google_client
@app.post("/", status_code=status.HTTP_200_OK)
def main(*, data: Data = Body(...)):
body = jsonable_encoder(data)
src_lang = body["src_lang"]
src_text = body["src_text"]
tgt_lang = body["tgt_lang"]
tgt_text = translate_text(app.google_client, src_text, tgt_lang)
body.update({"tgt_text": tgt_text})
return JSONResponse(content=jsonable_encoder(body))
- 생성할 수 있는 커넥션 수(현재
개)까지 커넥션을 생성함 - 이전 요청이 완료되지 않아도 다음 요청을 받아서 요청-응답 프로세스를 진행함
- 결과적으로, 코드 내부에 Blocking I/O를 비동기 처리하지 않아도 비동기 방식으로 작동하려면 Path operation 함수에
키워드를 붙이지 않아야 함
- 총 소요시간은
초이며, 초당 처리 수는42.36
4) 테스트 요약
블로킹 | 총 소요시간 | 초당 처리 수 | |
async def → def | Yes | 18.232 | 5.48 |
run_in_threadpool() | No | 1.445 | 69.21 |
def → def | No | 2.361 | 42.36 |
3. 정리
1) asyncio
- Python에서 비동기 처리를 할 때는
를 이용함 asyncio
는 CPU bound보다는 Blocking I/O를 처리하는 데 초점을 둠(CPU bound도 저수준 API를 이용하여 구현하면async
는 네이티브 코루틴(Coroutine) 개념을 이용하여 구현되었으며, 기본적인 동작 방식은 하나의 쓰레드가 마치 멀티 태스킹처럼 메인 루틴과 코루틴을 번갈아가며 코드를 실행함async def
함수일 때 모두 비동기 처리를 할 수 있지만, 두 가지 상황에 대한 처리 방식이 다름
async def
함수일 때는 네이티브 코루틴의 개념을 이용하여 비동기 처리를 함- 이벤트 루프 안에 태스크를 넣어 일반적으로 모든 태스크가 끝날 때까지(중간에 끊는 경우도 있음) 이벤트 루프를 돌리는 것으로 구현함
등과 같이 네이티브 코루틴으로 구현된 Blocking I/O는await
할 수 있음
함수일 때는 프로세스 풀(Process pool) 또는 쓰레드 풀(Thread pool)을 이용하여 비동기 처리를 함def
함수일 때는await
할 수 없기 때문에 네이티브 코루틴 방식으로는 비동기 처리를 할 수 없음- 이런 이유로 프로세스/쓰레드 풀을 이용하는 것이며, Context switching을 통해 프로세스/쓰레드를 번갈아가며 코드를 실행하는 것으로 비동기 처리를 할 수 있는 것임
- 즉,
async def
함수는 멀티 태스킹 방식처럼 비동기 처리를 하는 것이고,def
함수는 멀티 프로세싱/쓰레딩 방식으로 비동기 처리를 하는 것임
함수의 비동기 처리는 프로세스/쓰레드 풀을 이용하여 해결한다고 했는데, Python GIL 때문에 멀티 쓰레딩을 사용하지 않는 것이 좋다고 생각할 수 있지만, GIL은 CPU bound에서 공유 자원에 대해 발생할 수 있는 Race condition 문제 때문에 필요한 것임- 그래서
함수의 비동기 처리 방식은 CPU bound일 경우 멀티 프로세싱 방식으로, Blocking I/O일 경우 멀티 쓰레딩 방식으로 구분해서 해결함 - 이 때 사용하는 메서드가
메서드이고, 호출 형태는run_in_executor(executor, func, *args)
임 - CPU bound인 경우
를 생성하고, Blocking I/O인 경우ThreadPoolExecutor
)를 생성하여run_in_executor()
메서드에 첫 번째 인자로 넣음
- 다음은
공식 문서에 있는run_in_executor()
메서드 샘플 코드임 - CPU bound일 때와 Blocking I/O를 구분해서
를 다르게 생성한 후 인자에 넣는 것을 확인할 수 있음
import asyncio
import concurrent.futures
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
with open('/dev/urandom', 'rb') as f:
return f.read(100)
def cpu_bound():
# CPU-bound operations will block the event loop:
# in general it is preferable to run them in a
# process pool.
return sum(i * i for i in range(10 ** 7))
async def main():
loop = asyncio.get_running_loop()
## Options:
# 1. Run in the default loop's executor:
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)
# 2. Run in a custom thread pool:
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)
# 3. Run in a custom process pool:
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)
2) FastAPI 프레임워크
pip install uvicorn[standard]
로 Uvicorn을 설치한 경우, CLI에서uvicorn
을 실행하면uvloop
라는 이벤트 루프를 생성한 상태가 됨- Uvicorn 코드를 확인해보면
의 추상 클래스를 상속받아 구현되었으며,run_in_executor()
도 동일하게 구현되었음
- 마찬가지로
의 비동기 처리 방식처럼 네이티브 코루틴으로 구현되어 있어야만 프로세스/쓰레드 풀을 사용하지 않고 비동기 처리를 할 수 있음 - 그래서
Path operation 함수에서def
함수를 호출하는 경우, Uvicorn이def
Path operatoin 함수를 호출할 때run_in_executor()
메서드에 넣어 실행하기 때문에 멀티 쓰레딩 방식(디폴트는 Blocking I/O)으로 비동기 처리가 되는 것임 async def
Path operation 함수에서def
함수를 호출하는 방식은 결국async def
Path operation 함수 안에서 이벤트 루프 또는 프로세스/쓰레드 풀 방식의 비동기 처리 코드가 구현되지 않았기 때문에 블로킹되는 것임
- 위의 테스트용 코드에서
함수인 Google Translation API를 비동기로 호출하기 위해run_in_threadpool()
의 인자로def
함수를 넣었음
from fastapi.concurrency import run_in_threadpool
tgt_text = await run_in_threadpool(
translate_text, app.google_client, src_text, tgt_lang
메서드의 인자로ThreadPoolExecutor
를 넣어야 하는 것을 FastAPI에서는fastapi.concurrency.run_in_threadpool()
메서드로 간단하게 해결할 수 있게 구현함(이상한 점이run_in_processpool()
메서드는 없음 - 못찾았을 수도 있음)
- 추가적으로 최대 커넥션 수(최대 쓰레드 수)는
개였는데, 그 이유는 최대 워커(max_workers
이거나 주어지지 않았다면, 최대 쓰레드 수는 기본값으로프로세서 수 * 5
를 사용함 - 그래서 테스트 환경처럼
코어 CPU 환경이라면 최대 쓰레드 수는40
개가 되는 것임
3) 결론
(1) 네이티브 코루틴을 구현할 수 있거나 써드 파티 라이브러리(aiohttp
등)를 이용하는 경우
1] CPU bound
async def Path operation 함수 → await CPU bound 함수()
from fastapi import FastAPI
import asyncio
async def cpu_bound():
async def main():
await cpu_bound()
2] Blocking I/O
async def Path operation 함수 → await Blocking I/O 함수()
from fastapi import FastAPI
import asyncio
async def blocking_io():
async def main():
await blocking_io()
(2) 네이티브 코루틴을 구현할 수 없는 경우
1] CPU bound
2] Blocking I/O