3. First Steps with Celery
1. Choosing a Broker
- Celery는 메시지를 보내고 받기 위한 솔루션이 필요하다.
- 일반적으로 이는 메시지 브로커라고 하는 별도의 서비스 형태로 제공된다.
- RabbitMQ 또는 Redis를 사용한다.
2. Installing Celery
3. Application
- 가장 먼저 필요한 것은 Celery 인스턴스인데, 이것을 Celery 애플리케이션이라고 부른다.
- 이 인스턴스는 태스크 생성 및 워커 관리와 같이 Celery에서 수행하려는 모든 작업의 엔트리포인트로 사용되므로 다른 모듈에서 가져올 수 있어야 한다.
- 먼저
tasks.py
파일을 생성한 후 다음과 같이 작성한다.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task
def add(x, y):
return x + y
Celery
클래스의 첫 번째 인수는 현재 모듈의 이름이다.
- 이것은 작업이
__main__
모듈에 정의될 때 이름이 자동으로 생성될 수 있도록 하기 위해서만 필요하다.
- 두 번째 인수는 사용하려는 메시지 브로커의 URL이다.
- RabbitMQ의 경우
“pyamqp://guest@localhost//”
와 같이 넣어준다.
4. Running the Celery worker server
- 이제
worker
인수로 프로그램을 실행하여 워커를 실행할 수 있다.
celery -A tasks worker --loglevel=INFO
celery@SAEMui-MacBookPro-13.local v5.2.2 (dawn-chorus)
macOS-10.16-x86_64-i386-64bit 2021-12-29 14:39:05
[config]
.> app: tasks:0x7fda9924be80
.> transport: redis://localhost:6379/0
.> results: disabled://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2021-12-29 14:39:05,695: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-12-29 14:39:05,706: INFO/MainProcess] mingle: searching for neighbors
[2021-12-29 14:39:06,735: INFO/MainProcess] mingle: all alone
[2021-12-29 14:39:06,797: INFO/MainProcess] celery@SAEMui-MacBookPro-13.local ready.
-A
옵션은 모듈 이름을 지정하는 것이다.
worker
는 워커 인스턴스를 실행하는 명령어를 나타낸다.
- 프로덕션에서는 워커를 데몬으로 백그라운드에서 실행하도록 한다.
5. Calling the task
- 태스크를 호출하는 경우
delay()
메서드를 사용할 수 있다.
>>> from tasks import add
>>> add.delay(4, 4)
<AsyncResult: fa738ba6-c882-405c-8c4a-c9c195c12f37>
- 다음과 같이 워커가 태스크를 처리한 것을 확인할 수 있다.
[2021-12-29 14:43:16,375: INFO/MainProcess] Task tasks.add[fa738ba6-c882-405c-8c4a-c9c195c12f37] received
[2021-12-29 14:43:16,378: INFO/ForkPoolWorker-8] Task tasks.add[fa738ba6-c882-405c-8c4a-c9c195c12f37] succeeded in 0.0007806250000044201s: 8
- 태스크를 호출하면
AsyncResult
인스턴스가 반환되는데, 이것은 태스크 상태를 확인하거나 태스크가 완료될 때까지 기다리거나 반환 값을 가져오는 데 사용할 수 있다.
- 또는 태스크가 실패한 경우 예외 및 트레이스백을 가져올 수 있다.
6. Keepin Results
- 태스크의 상태를 추적하려면 Celery가 상태를 어딘가에 저장하거나 보내야 한다.
- 이때 MongoDB, Redis 등 백엔드를 사용하면 되는데, 위에서 작성한
tasks.py
파일의 app
을 다음과 같이 수정한다.
from celery import Celery
app = Celery(
"tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1"
)
@app.task
def add(x, y):
return x + y
- 워커 인스턴스를 다시 실행하면 다음과 같이
results
값이 disabled://
에서 redis://localhost:6379/1
로 변경된 것을 확인할 수 있다.
celery -A tasks worker --loglevel=INFO
celery@SAEMui-MacBookPro-13.local v5.2.2 (dawn-chorus)
macOS-10.16-x86_64-i386-64bit 2021-12-29 15:04:44
[config]
.> app: tasks:0x7fa300f53e80
.> transport: redis://localhost:6379/0
.> results: redis://localhost:6379/1
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. tasks.add
[2021-12-29 15:04:45,143: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-12-29 15:04:45,153: INFO/MainProcess] mingle: searching for neighbors
[2021-12-29 15:04:46,185: INFO/MainProcess] mingle: all alone
[2021-12-29 15:04:46,242: INFO/MainProcess] celery@SAEMui-MacBookPro-13.local ready.
- 이제 백엔드가 구성되면 현재 Python 세션을 닫고
tasks
모듈을 다시 가져와 변경 사항을 적용한다.
- 이번에는 태스크를 호출할 때 반환된
AsyncResult
인스턴스를 result
변수에 저장하여 유지한다.
>>> from tasks import add
>>> result = add.delay(4, 4)
ready()
메서드는 태스크가 처리를 완료했는지 여부를 반환한다.
- 처리를 완료한 경우에는
True
, 완료하지 못한 경우에는 False
를 반환한다.
- 물론 결과가 완료될 때까지 다음과 같이 기다릴 수 있지만, 이는 비동기 호출을 동기 호출로 바꾸기 때문에 거의 사용되지 않는다.
>>> result.get(timeout=1)
8
- 만약 다음과 같이 예외가 발생한 경우
get()
은 예외를 다시 발생시킨다.
>>> result = add.delay(4, "a")
>>> result.ready()
True
>>> result.get()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 220, in get
self.maybe_throw(callback=callback)
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 336, in maybe_throw
self.throw(value, self._to_remote_traceback(tb))
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 329, in throw
self.on_ready.throw(*args, **kwargs)
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/promises.py", line 234, in throw
reraise(type(exc), exc, tb)
File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/utils.py", line 30, in reraise
raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'
- 이런 경우
propagate
인수를 지정하여 오버라이드할 수 있다.
>>> result.get(propagate=False)
TypeError: unsupported operand type(s) for +: 'int' and 'str'
- 태스크에서 예외가 발생한 경우 트레이스백에 접근할 수도 있다.
>>> result.traceback
'Traceback (most recent call last):\n File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/app/trace.py", line 451, in trace_task\n R = retval = fun(*args, **kwargs)\n File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/app/trace.py", line 734, in __protected_call__\n return self.run(*args, **kwargs)\n File "/Users/SAEMC/Github/Celery/tasks.py", line 11, in add\n return x + y\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n'
- 백엔드는 리소스를 사용하여 결과를 저장하고 전송한다.
- 리소스가 해제되도록 하려면 태스크 호출 후 반환되는 모든
AsyncResult
인스턴스에서 get()
또는 forget()
을 호출해야 한다.
7. Configuration
Celery
클래스로 인스턴스 생성 후 다음과 같이 구성을 변경할 수 있다.
app.conf.task_serializer = "json"
- 변경할 구성이 여러 개인 경우 다음과 같이
update()
를 통해 작성하면 된다.
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Europe/Oslo",
enable_utc=True,
)
app.config_from_object()
메서드를 호출하여 구성 모듈을 사용하도록 지시할 수 있다.
app.config_from_object("celeryconfig")
- 그리고
celeryconfig.py
파일에 다음과 같이 작성한다.
broker_url = "pyamqp://"
result_backend = "rpc://"
task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Europe/Oslo"
enable_utc = True
celeryconfig.py
파일에 오류가 없는지 확인하려면 다음과 같이 모듈로 실행하면 된다.
- 구성을 다음과 같이 변경하면 태스크를 라우팅할 수 있다.
task_routes = {
"tasks.add": "low-priority"
}
- 또는 라우팅하는 대신 태스크 처리 속도를 변경할 수 있다.
task_annotations = {
"tasks.add": {"rate_limit": "10/m"}
}
- RabbitMQ 또는 Redis를 브로커로 사용하는 경우 워커에게 런타임 시 태스크에 대한 속도 제한을 설정하도록 지시할 수 있다.
celery -A tasks control rate_limit tasks.add 10/m
References