4. Next Steps
1. Using Celery in Application
1) Project
- 프로젝트 레이아웃은 다음과 같다.
__init__.py
파일을 생성하여 하나의 패키지로 만든다.
- 먼저
proj/celery.py
파일에 다음과 같이 작성한다.
from celery import Celery
app = Celery(
"proj",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=["proj.tasks"],
)
app.conf.update(result_expires=3600)
if __name__ == "__main__":
app.start()
- 위의 모듈에서는 Celery 인스턴스를 생성했다.
- 프로젝트 내에서 Celery를 사용하려면 이 인스턴스를 가져오기만 하면된다.
- Celery 구성과 관련된 내용으로 작성하면 된다.
broker
인수는 사용할 브로커의 URL을 지정하고,backend
인수는 사용할 결과 백엔드를 지정한다.- 백엔드는 태스크 상태 및 결과를 추적하는 데 사용된다.
include
인수는 워커가 시작할 때 가져올 모듈 목록이며, 워커가 태스크를 찾을 수 있도록 태스크 모듈을 추가해야 한다.
- 이제
proj/tasks.py
파일에 다음과 같이 작성한다.
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
2) Starting the worker
- Celery 프로그램을 사용하여 워커를 시작한다.
celery@SAEMui-MacBookPro-13.local v5.2.2 (dawn-chorus)
macOS-10.16-x86_64-i386-64bit 2021-12-29 17:20:51
[config]
.> app: proj:0x7f7e793bbfa0
.> transport: redis://localhost:6379/0
.> results: redis://localhost:6379/1
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. proj.tasks.add
. proj.tasks.mul
. proj.tasks.xsum
[2021-12-29 17:20:51,531: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-12-29 17:20:51,541: INFO/MainProcess] mingle: searching for neighbors
[2021-12-29 17:20:52,584: INFO/MainProcess] mingle: all alone
[2021-12-29 17:20:52,666: INFO/MainProcess] celery@SAEMui-MacBookPro-13.local ready.
- 위의 내용은 다음과 같다.
1] transport
- Celery 모듈의
broker
인수에 지정한 URL을 나타낸다. -b
옵션을 사용하여 다른 브로커를 지정할 수도 있다.
2] concurrency
- 태스크를 동시에 처리하는 데 사용되는 Prefork 워커 프로세스의 수이다.
- 모든 태스크가 처리 중이라면 새 태스크는 처리 중인 태스크 하나가 완료될 때까지 기다려야 한다.
- 기본
concurrency
수는 해당 머신의 CPU 수이며,-c
옵션을 사용하여 변경할 수 있다. - 기본 Prefork 풀을 포함하여 Eventlet, Gevent 등도 지원한다.
3] task events
- 워커에서 발생하는 태스크에 대해 Celery가 모니터링 메시지(이벤트)를 보내도록 하는 옵션이다.
4] queues
- 워커가 태스크를 사용할 큐 목록이다.
- 워커는 한 번에 여러 큐에서 소비하도록 지시할 수 있으며, 라우팅과 관련하여 우선 순위 등을 지정하기 위한 수단으로 특정 워커에게 메시지를 라우팅하는 데 사용된다.
3) In the background
- 프로덕션에서는 데몬으로 백그라운드에서 워커를 실행한다.
- 이때
celery multi
명령을 사용하여 하나 이상의 워커를 백그라운드에서 실행한다.
- 백그라운드 실행을 중지하려면 다음과 같이 입력한다.
celery multi v5.2.2 (dawn-chorus)
> Stopping nodes...
> w1@SAEMui-MacBookPro-13.local: TERM -> 92756
celery multi
명령은 워커에 대한 정보를 저장하지 않으므로 다시 시작할 때 동일한 명령 인수를 사용해야 한다.- 중지할 때도 동일한 Pidfile 및 Logfile 인수만 사용해야 한다.
2. Calling Tasks
delay()
메서드를 사용하여 태스크를 호출할 수 있다.
delay()
메서드는apply_async()
메서드를 줄인 것이다.
apply_async()
메서드를 사용하면 실행할 시간(카운트다운), 전송해야 하는 큐 등과 같은 실행 옵션을 지정할 수 있다.
- 위의 코드에서 태스크는
lopri
라는 큐로 전송되고, 메시지가 전송된 후 빠르면10
초 후에 실행된다.
- 만약 태스크를
delay()
및apply_async()
메서드를 사용하지 않고 직접 적용하면 현재 프로세스에서 실행되므로 메시지가 전송되지 않는다.
delay()
및apply_async()
메서드는 태스크 실행 상태를 추적하는 데 사용할 수 있는AsyncResult
인스턴스를 반환한다.- 그러므로 상태를 어딘가에 저장할 수 있도록 결과 백엔드를 활성화해야 한다.
- 결과 백엔드가 구성되어 있으면 태스크의 반환 값을 검색할 수 있다.
id
속성으로 태스크의 ID 확인할 수 있다.
- 태스크에서 예외가 발생한 경우 예외 및 트레이스백을 검사할 수 있다.
result.get()
은 모든 에러를 출력한다.
>>> res = add.delay(2, "2")
>>> res.get(timeout=1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 224, in get
return self.backend.wait_for_pending(
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pending
return result.maybe_throw(callback=callback, propagate=propagate)
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 336, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 329, in throw
self.on_ready.throw(*args, **kwargs)
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/promises.py", line 234, in throw
reraise(type(exc), exc, tb)
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/utils.py", line 30, in reraise
raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'
- 모든 에러가 출력되지 않도록 하려면
propagate
인수를 전달하면 된다.
- 위의 경우 예외 인스턴스를 반환하며, 태스크의 성공 또는 실패 여부를 확인할 수 있다.
- 또한 다음과 같이
state
속성으로 확인할 수도 있다.
- 태스크는 단일 상태로 존재할 수 있지만, 여러 상태를 거쳐 진행된다.
- 일반적인 태스크의 단계는 다음과 같다.
STARTED
상태는task_track_started
설정이 활성화된 경우 또는 태스크에 대해@task(track_started_True)
옵션이 설정된 경우에만 기록된다.PENDING
상태는 실제로 기록된 상태가 아니라 알 수 없는 태스크 ID의 기본 상태이다.
- 만약 존재하는 태스크 ID를 입력해서 확인하면 상태를 확인할 수 있다.
>>> res = add.delay(2, 2)
>>> res.state
'SUCCESS'
>>> res.id
'f9ba8c71-7fe4-43e1-a490-8070667491db'
>>> res = app.AsyncResult('f9ba8c71-7fe4-43e1-a490-8070667491db')
>>> res.state
'SUCCESS'
3. Canvas: Designing Work-flows
- 대부분의 워크플로우에서는
delay()
메서드를 사용하여 태스크를 호출하는 방법을 사용한다. - 하지만 때로는 태스크 호출의 시그니처를 다른 프로세스에 전달하거나 다른 함수에 대한 인수로 전달해야 한다.
- 그래서
signature()
메서드를 사용하는데, 단일 태스크 호출의 인수 및 실행 옵션을 함수로 전달하거나 직렬화하여 보낼 수 있는 방식으로 래핑한다. - 다음과 같이 인수
(2, 2)
와10
초 카운트다운을 사용하여add
태스크에 대한 시그니처를 만들 수 있다.
>>> sig = add.signature((2, 2), countdown=10)
>>> sig.get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: get expected at least 1 argument, got 0
>>> res = sig.delay()
>>> res.get()
4
- 다음과 같이
s()
메서드로 줄여서 사용할 수 있다.
- 또한
s()
메서드의 인수로 하나만 지정한 후 시그니처 호출 시 나머지 인수를 지정할 수 있다.
- 키워드 인수는 위치 인수 뒤에 추가할 수 있다.
4. The Primitives
- 시그니처를 잘 활용하려면 프리머티브를 사용해야 한다.
- 프리머티브는 시그니처 객체 자체이므로 복잡한 워크플로우를 구성하기 위해 이러한 시그니처 객체끼리 다양한 방법으로 결합할 수 있다.
1) Groups
group
은 태스크 목록을 병렬로 호출하고 결과를 그룹으로 검사하고 반환 값을 순서대로 검색할 수 있는 결과 인스턴스를 반환한다.
>>> from celery import group
>>> from proj.tasks import add
>>> g = group(add.s(i, i) for i in range(10))
>>> res = g()
>>> res.get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
- 또한 다음과 같이 사용할 수도 있다.
>>> g = group(add.s(i) for i in range(10))
>>> res = g(10)
>>> res.get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
2) Chains
chain
은 한 태스크가 반환된 후 다른 태스크가 호출되도록 연결할 수 있다.
>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> c = chain(add.s(4, 4) | mul.s(8))
>>> res = c()
>>> res.get()
64
- 또한 다음과 같이 사용할 수도 있다.
3) Chords
chord
는 콜백 태스크가 있을 때 사용한다.
>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> c = chord((add.s(i, i) for i in range(10)), xsum.s())
>>> res = c()
>>> res.get()
90
- 이러한 프리머티브는 모두 시그니처 타입이므로 원하는 대로 거의 다 결합할 수 있다.