Skip to content

4. Next Steps


1. Using Celery in Application

1) Project

  • 프로젝트 레이아웃은 다음과 같다.
  • __init__.py 파일을 생성하여 하나의 패키지로 만든다.


proj
├── __init__.py
├── celery.py
└── tasks.py

0 directories, 3 files


  • 먼저 proj/celery.py 파일에 다음과 같이 작성한다.


from celery import Celery

app = Celery(
    "proj",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=["proj.tasks"],
)

app.conf.update(result_expires=3600)

if __name__ == "__main__":
    app.start()


  • 위의 모듈에서는 Celery 인스턴스를 생성했다.
  • 프로젝트 내에서 Celery를 사용하려면 이 인스턴스를 가져오기만 하면된다.
  • Celery 구성과 관련된 내용으로 작성하면 된다.
  • broker 인수는 사용할 브로커의 URL을 지정하고, backend 인수는 사용할 결과 백엔드를 지정한다.
  • 백엔드는 태스크 상태 및 결과를 추적하는 데 사용된다.
  • include 인수는 워커가 시작할 때 가져올 모듈 목록이며, 워커가 태스크를 찾을 수 있도록 태스크 모듈을 추가해야 한다.


  • 이제 proj/tasks.py 파일에 다음과 같이 작성한다.


from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)


2) Starting the worker

  • Celery 프로그램을 사용하여 워커를 시작한다.


celery -A proj worker -l INFO
celery@SAEMui-MacBookPro-13.local v5.2.2 (dawn-chorus)

macOS-10.16-x86_64-i386-64bit 2021-12-29 17:20:51

[config]
.> app:         proj:0x7f7e793bbfa0
.> transport:   redis://localhost:6379/0
.> results:     redis://localhost:6379/1
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery

[tasks]
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum

[2021-12-29 17:20:51,531: INFO/MainProcess] Connected to redis://localhost:6379/0
[2021-12-29 17:20:51,541: INFO/MainProcess] mingle: searching for neighbors
[2021-12-29 17:20:52,584: INFO/MainProcess] mingle: all alone
[2021-12-29 17:20:52,666: INFO/MainProcess] celery@SAEMui-MacBookPro-13.local ready.


  • 위의 내용은 다음과 같다.


1] transport

  • Celery 모듈의 broker 인수에 지정한 URL을 나타낸다.
  • -b 옵션을 사용하여 다른 브로커를 지정할 수도 있다.

2] concurrency

  • 태스크를 동시에 처리하는 데 사용되는 Prefork 워커 프로세스의 수이다.
  • 모든 태스크가 처리 중이라면 새 태스크는 처리 중인 태스크 하나가 완료될 때까지 기다려야 한다.
  • 기본 concurrency 수는 해당 머신의 CPU 수이며, -c 옵션을 사용하여 변경할 수 있다.
  • 기본 Prefork 풀을 포함하여 Eventlet, Gevent 등도 지원한다.

3] task events

  • 워커에서 발생하는 태스크에 대해 Celery가 모니터링 메시지(이벤트)를 보내도록 하는 옵션이다.

4] queues

  • 워커가 태스크를 사용할 큐 목록이다.
  • 워커는 한 번에 여러 큐에서 소비하도록 지시할 수 있으며, 라우팅과 관련하여 우선 순위 등을 지정하기 위한 수단으로 특정 워커에게 메시지를 라우팅하는 데 사용된다.


3) In the background

  • 프로덕션에서는 데몬으로 백그라운드에서 워커를 실행한다.
  • 이때 celery multi 명령을 사용하여 하나 이상의 워커를 백그라운드에서 실행한다.


sudo celery multi start w1 -A proj -l INFO
celery multi v5.2.2 (dawn-chorus)
> Starting nodes...
 > w1@SAEMui-MacBookPro-13.local: OK


  • 백그라운드 실행을 중지하려면 다음과 같이 입력한다.


sudo celery multi stop w1 -A proj -l INFO
celery multi v5.2.2 (dawn-chorus)
> Stopping nodes...
 > w1@SAEMui-MacBookPro-13.local: TERM -> 92756


  • celery multi 명령은 워커에 대한 정보를 저장하지 않으므로 다시 시작할 때 동일한 명령 인수를 사용해야 한다.
  • 중지할 때도 동일한 Pidfile 및 Logfile 인수만 사용해야 한다.


2. Calling Tasks

  • delay() 메서드를 사용하여 태스크를 호출할 수 있다.


>>> from proj.tasks import add

>>> add.delay(2, 2)


  • delay() 메서드는 apply_async() 메서드를 줄인 것이다.


>>> add.apply_async((2, 2))


  • apply_async() 메서드를 사용하면 실행할 시간(카운트다운), 전송해야 하는 큐 등과 같은 실행 옵션을 지정할 수 있다.


>>> add.apply_async((2, 2), queue="lopri", countdown=10)


  • 위의 코드에서 태스크는 lopri라는 큐로 전송되고, 메시지가 전송된 후 빠르면 10초 후에 실행된다.


  • 만약 태스크를 delay()apply_async() 메서드를 사용하지 않고 직접 적용하면 현재 프로세스에서 실행되므로 메시지가 전송되지 않는다.


>>> add(2, 2)
4


  • delay()apply_async() 메서드는 태스크 실행 상태를 추적하는 데 사용할 수 있는 AsyncResult 인스턴스를 반환한다.
  • 그러므로 상태를 어딘가에 저장할 수 있도록 결과 백엔드를 활성화해야 한다.
  • 결과 백엔드가 구성되어 있으면 태스크의 반환 값을 검색할 수 있다.


>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4


  • id 속성으로 태스크의 ID 확인할 수 있다.


>>> res.id
'f364cb73-20e1-46bb-af6e-97f0d3a3141c'


  • 태스크에서 예외가 발생한 경우 예외 및 트레이스백을 검사할 수 있다.
  • result.get()은 모든 에러를 출력한다.


>>> res = add.delay(2, "2")

>>> res.get(timeout=1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 224, in get
    return self.backend.wait_for_pending(
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/backends/asynchronous.py", line 223, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 336, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/celery/result.py", line 329, in throw
    self.on_ready.throw(*args, **kwargs)
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/promises.py", line 234, in throw
    reraise(type(exc), exc, tb)
  File "/Users/SAEMC/opt/anaconda3/envs/saemc/lib/python3.8/site-packages/vine/utils.py", line 30, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'


  • 모든 에러가 출력되지 않도록 하려면 propagate 인수를 전달하면 된다.


>>> res.get(propagate=False)
TypeError: unsupported operand type(s) for +: 'int' and 'str'


  • 위의 경우 예외 인스턴스를 반환하며, 태스크의 성공 또는 실패 여부를 확인할 수 있다.


>>> res.failed()
True

>>> res.successful()
False


  • 또한 다음과 같이 state 속성으로 확인할 수도 있다.


>>> res.state
'FAILURE'


  • 태스크는 단일 상태로 존재할 수 있지만, 여러 상태를 거쳐 진행된다.
  • 일반적인 태스크의 단계는 다음과 같다.


PENDING -> STARTED -> SUCCESS


  • STARTED 상태는 task_track_started 설정이 활성화된 경우 또는 태스크에 대해 @task(track_started_True) 옵션이 설정된 경우에만 기록된다.
  • PENDING 상태는 실제로 기록된 상태가 아니라 알 수 없는 태스크 ID의 기본 상태이다.


>>> from proj.celery import app

>>> res = app.AsyncResult("unknown-id")
>>> res.state
'PENDING'


  • 만약 존재하는 태스크 ID를 입력해서 확인하면 상태를 확인할 수 있다.


>>> res = add.delay(2, 2)
>>> res.state
'SUCCESS'
>>> res.id
'f9ba8c71-7fe4-43e1-a490-8070667491db'

>>> res = app.AsyncResult('f9ba8c71-7fe4-43e1-a490-8070667491db')
>>> res.state
'SUCCESS'


3. Canvas: Designing Work-flows

  • 대부분의 워크플로우에서는 delay() 메서드를 사용하여 태스크를 호출하는 방법을 사용한다.
  • 하지만 때로는 태스크 호출의 시그니처를 다른 프로세스에 전달하거나 다른 함수에 대한 인수로 전달해야 한다.
  • 그래서 signature() 메서드를 사용하는데, 단일 태스크 호출의 인수 및 실행 옵션을 함수로 전달하거나 직렬화하여 보낼 수 있는 방식으로 래핑한다.
  • 다음과 같이 인수 (2, 2)10초 카운트다운을 사용하여 add 태스크에 대한 시그니처를 만들 수 있다.


>>> sig = add.signature((2, 2), countdown=10)
>>> sig.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: get expected at least 1 argument, got 0

>>> res = sig.delay()
>>> res.get()
4


  • 다음과 같이 s() 메서드로 줄여서 사용할 수 있다.


>>> sig = add.s(2, 2)

>>> res = sig.delay()
>>> res.get()
4


  • 또한 s() 메서드의 인수로 하나만 지정한 후 시그니처 호출 시 나머지 인수를 지정할 수 있다.


>>> sig = add.s(2)

>>> res = sig.delay(8)
>>> res.get()
10


  • 키워드 인수는 위치 인수 뒤에 추가할 수 있다.


>>> sig = add.s(2, 2, debug=True)


4. The Primitives

  • 시그니처를 잘 활용하려면 프리머티브를 사용해야 한다.
  • 프리머티브는 시그니처 객체 자체이므로 복잡한 워크플로우를 구성하기 위해 이러한 시그니처 객체끼리 다양한 방법으로 결합할 수 있다.


1) Groups

  • group은 태스크 목록을 병렬로 호출하고 결과를 그룹으로 검사하고 반환 값을 순서대로 검색할 수 있는 결과 인스턴스를 반환한다.


>>> from celery import group
>>> from proj.tasks import add

>>> g = group(add.s(i, i) for i in range(10))
>>> res = g()
>>> res.get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


  • 또한 다음과 같이 사용할 수도 있다.


>>> g = group(add.s(i) for i in range(10))
>>> res = g(10)
>>> res.get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


2) Chains

  • chain은 한 태스크가 반환된 후 다른 태스크가 호출되도록 연결할 수 있다.


>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> c = chain(add.s(4, 4) | mul.s(8))
>>> res = c()
>>> res.get()
64


  • 또한 다음과 같이 사용할 수도 있다.


# (? + 4) * 8
>>> c = chain(add.s(4) | mul.s(8))
>>> res = g(4)
>>> res.get()
64


3) Chords

  • chord는 콜백 태스크가 있을 때 사용한다.


>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> c = chord((add.s(i, i) for i in range(10)), xsum.s())
>>> res = c()
>>> res.get()
90


  • 이러한 프리머티브는 모두 시그니처 타입이므로 원하는 대로 거의 다 결합할 수 있다.


>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

References