我有一个 Django REST Framework 项目,我需要顺序调用两个 Celery 任务。具体来说,我需要调用
first_function
,然后在进行一些操作后,调用 second_function
,确保 second_function
仅在 first_function
完成后运行。
# tasks.py
from celery import shared_task
@shared_task
def first_function():
# Logic for the first function
@shared_task
def second_function():
# Logic for the second function
这是我的观点.py
first_function.delay()
# some codes ....
# now i need to call my second_function.delay()
second_function.delay()
我无法调用
chain(first_function.s(), second_function.s()).delay()
,因为我需要在代码开头调用 first_function
。如果出现异常,first_function
应该独立运行。但是,如果一切正常,我只想在确保 second_function
已完成后才调用 first_function
。
我担心的是:
second_function
正确等待相应的first_function
完成。second_function
在与同一请求相关的特定 first_function
之后运行有点困惑?(注意:我不能在代码中间添加睡眠或任何阻塞代码。)任何使用 Celery 处理这种情况的指导或最佳实践将不胜感激!
您可以在第二个任务开始时读取第一个任务的状态,并等到第一个任务状态为“SUCCESS”。
from celery.result import AsyncResult
first_task = first_function.delay()
# some codes ....
while True:
# Poll the first task until it's completed
result = AsyncResult(first_task.id)
if result.state == 'SUCCESS':
# If the first task is successful, call the second task
second_task = second_function.delay(result.result)
elif result.state in ['FAILURE', 'REVOKED']:
break
# Sleep for a short while before checking again
time.sleep(2)
如果你不想使用 sleep 那么你可以引入第三个任务来避免阻塞主线程
from celery import shared_task
from celery.result import AsyncResult
@shared_task
def first_function():
# Your first function logic here
pass
@shared_task
def second_function(result):
# Your second function logic here
# 'result' is the result of first_function
pass
@shared_task(bind=True)
def monitor_tasks(self, first_task_id):
# Monitor the first task
result = AsyncResult(first_task_id)
if result.state == 'SUCCESS':
# If the first task is successful, call the second task
second_function.delay(result.result)
elif result.state in ['FAILURE', 'REVOKED']:
# If the first task failed or was revoked, stop monitoring
print("First task failed or was revoked. Not triggering the second task.")
else:
# Retry the monitoring task if the first task is not finished yet
self.retry(countdown=2, max_retries=None)
# Start the first function asynchronously
first_task = first_function.delay()
# some codes ....
# Start monitoring task
monitor_tasks.delay(first_task.id)