Skip to content

3. First Steps with Celery


1. Choosing a Broker

  • Celery는 메시지를 보내고 받기 위한 솔루션이 필요하다.
  • 일반적으로 이는 메시지 브로커라고 하는 별도의 서비스 형태로 제공된다.
  • RabbitMQ 또는 Redis를 사용한다.


2. Installing Celery

  • Celery는 다음과 같이 설치한다.


pip install celery


3. Application

  • 가장 먼저 필요한 것은 Celery 인스턴스인데, 이것을 Celery 애플리케이션이라고 부른다.
  • 이 인스턴스는 태스크 생성 및 워커 관리와 같이 Celery에서 수행하려는 모든 작업의 엔트리포인트로 사용되므로 다른 모듈에서 가져올 수 있어야 한다.


  • 먼저 tasks.py 파일을 생성한 후 다음과 같이 작성한다.


from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0")

@app.task
def add(x, y):
    return x + y


  • Celery 클래스의 첫 번째 인수는 현재 모듈의 이름이다.
  • 이것은 작업이 __main__ 모듈에 정의될 때 이름이 자동으로 생성될 수 있도록 하기 위해서만 필요하다.
  • 두 번째 인수는 사용하려는 메시지 브로커의 URL이다.
  • RabbitMQ의 경우 “pyamqp://guest@localhost//”와 같이 넣어준다.


4. Running the Celery worker server

  • 이제 worker 인수로 프로그램을 실행하여 워커를 실행할 수 있다.


celery -A tasks worker --loglevel=INFO
celery@SAEMui-MacBookPro-13.local v5.2.2 (dawn-chorus)

macOS-10.16-x86_64-i386-64bit 2021-12-29 14:39:05

[config]
.> app:         tasks:0x7fda9924be80
.> transport:   redis://localhost:6379/0
.> results:     disabled://
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add

[2021-12-29 14:39:05,695: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-12-29 14:39:05,706: INFO/MainProcess] mingle: searching for neighbors
[2021-12-29 14:39:06,735: INFO/MainProcess] mingle: all alone
[2021-12-29 14:39:06,797: INFO/MainProcess] celery@SAEMui-MacBookPro-13.local ready.


  • -A 옵션은 모듈 이름을 지정하는 것이다.
  • worker는 워커 인스턴스를 실행하는 명령어를 나타낸다.
  • 프로덕션에서는 워커를 데몬으로 백그라운드에서 실행하도록 한다.


5. Calling the task

  • 태스크를 호출하는 경우 delay() 메서드를 사용할 수 있다.


>>> from tasks import add

>>> add.delay(4, 4)
<AsyncResult: fa738ba6-c882-405c-8c4a-c9c195c12f37>


  • 다음과 같이 워커가 태스크를 처리한 것을 확인할 수 있다.


[2021-12-29 14:43:16,375: INFO/MainProcess] Task tasks.add[fa738ba6-c882-405c-8c4a-c9c195c12f37] received
[2021-12-29 14:43:16,378: INFO/ForkPoolWorker-8] Task tasks.add[fa738ba6-c882-405c-8c4a-c9c195c12f37] succeeded in 0.0007806250000044201s: 8


  • 태스크를 호출하면 AsyncResult 인스턴스가 반환되는데, 이것은 태스크 상태를 확인하거나 태스크가 완료될 때까지 기다리거나 반환 값을 가져오는 데 사용할 수 있다.
  • 또는 태스크가 실패한 경우 예외 및 트레이스백을 가져올 수 있다.


6. Keepin Results

  • 태스크의 상태를 추적하려면 Celery가 상태를 어딘가에 저장하거나 보내야 한다.
  • 이때 MongoDB, Redis 등 백엔드를 사용하면 되는데, 위에서 작성한 tasks.py 파일의 app을 다음과 같이 수정한다.


from celery import Celery

app = Celery(
    "tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1"
)

@app.task
def add(x, y):
    return x + y


  • 워커 인스턴스를 다시 실행하면 다음과 같이 results 값이 disabled://에서 redis://localhost:6379/1로 변경된 것을 확인할 수 있다.


celery -A tasks worker --loglevel=INFO
celery@SAEMui-MacBookPro-13.local v5.2.2 (dawn-chorus)

macOS-10.16-x86_64-i386-64bit 2021-12-29 15:04:44

[config]
.> app:         tasks:0x7fa300f53e80
.> transport:   redis://localhost:6379/0
.> results:     redis://localhost:6379/1
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery

[tasks]
  . tasks.add

[2021-12-29 15:04:45,143: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-12-29 15:04:45,153: INFO/MainProcess] mingle: searching for neighbors
[2021-12-29 15:04:46,185: INFO/MainProcess] mingle: all alone
[2021-12-29 15:04:46,242: INFO/MainProcess] celery@SAEMui-MacBookPro-13.local ready.


  • 이제 백엔드가 구성되면 현재 Python 세션을 닫고 tasks 모듈을 다시 가져와 변경 사항을 적용한다.
  • 이번에는 태스크를 호출할 때 반환된 AsyncResult 인스턴스를 result 변수에 저장하여 유지한다.


>>> from tasks import add

>>> result = add.delay(4, 4)


  • ready() 메서드는 태스크가 처리를 완료했는지 여부를 반환한다.
  • 처리를 완료한 경우에는 True, 완료하지 못한 경우에는 False를 반환한다.


>>> result.ready()
False


  • 물론 결과가 완료될 때까지 다음과 같이 기다릴 수 있지만, 이는 비동기 호출을 동기 호출로 바꾸기 때문에 거의 사용되지 않는다.


>>> result.get(timeout=1)
8


  • 만약 다음과 같이 예외가 발생한 경우 get()은 예외를 다시 발생시킨다.


>>> result = add.delay(4, "a")
>>> result.ready()
True

>>> result.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 220, in get
    self.maybe_throw(callback=callback)
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 336, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 329, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/promises.py", line 234, in throw
    reraise(type(exc), exc, tb)
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/utils.py", line 30, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'


  • 이런 경우 propagate 인수를 지정하여 오버라이드할 수 있다.


>>> result.get(propagate=False)
TypeError: unsupported operand type(s) for +: 'int' and 'str'


  • 태스크에서 예외가 발생한 경우 트레이스백에 접근할 수도 있다.


>>> result.traceback
'Traceback (most recent call last):\n  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/app/trace.py", line 451, in trace_task\n    R = retval = fun(*args, **kwargs)\n  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/app/trace.py", line 734, in __protected_call__\n    return self.run(*args, **kwargs)\n  File "/Users/SAEMC/Github/Celery/tasks.py", line 11, in add\n    return x + y\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n'


  • 백엔드는 리소스를 사용하여 결과를 저장하고 전송한다.
  • 리소스가 해제되도록 하려면 태스크 호출 후 반환되는 모든 AsyncResult 인스턴스에서 get() 또는 forget()을 호출해야 한다.


7. Configuration

  • Celery 클래스로 인스턴스 생성 후 다음과 같이 구성을 변경할 수 있다.


app.conf.task_serializer = "json"


  • 변경할 구성이 여러 개인 경우 다음과 같이 update()를 통해 작성하면 된다.


app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Europe/Oslo",
    enable_utc=True,
)


  • app.config_from_object() 메서드를 호출하여 구성 모듈을 사용하도록 지시할 수 있다.


app.config_from_object("celeryconfig")


  • 그리고 celeryconfig.py 파일에 다음과 같이 작성한다.


broker_url = "pyamqp://"
result_backend = "rpc://"

task_serializer = "json"
result_serializer = "json"
accept_content = ["json"]
timezone = "Europe/Oslo"
enable_utc = True


  • celeryconfig.py 파일에 오류가 없는지 확인하려면 다음과 같이 모듈로 실행하면 된다.


python -m celeryconfig


  • 구성을 다음과 같이 변경하면 태스크를 라우팅할 수 있다.


task_routes = {
    "tasks.add": "low-priority"
}


  • 또는 라우팅하는 대신 태스크 처리 속도를 변경할 수 있다.


task_annotations = {
    "tasks.add": {"rate_limit": "10/m"}
}


  • RabbitMQ 또는 Redis를 브로커로 사용하는 경우 워커에게 런타임 시 태스크에 대한 속도 제한을 설정하도록 지시할 수 있다.


celery -A tasks control rate_limit tasks.add 10/m

References