确保在 Django REST Framework 中使用 Celery 顺序执行任务?

问题描述 投票:0回答:1

我有一个 Django REST Framework 项目,我需要顺序调用两个 Celery 任务。具体来说,我需要调用

first_function
,然后在进行一些操作后,调用
second_function
,确保
second_function
仅在
first_function
完成后运行。

# tasks.py
from celery import shared_task

@shared_task
def first_function():
    # Logic for the first function

@shared_task
def second_function():
    # Logic for the second function

这是我的观点.py

first_function.delay()

# some codes ....
# now i need to call my second_function.delay()
   
second_function.delay()

我无法调用

chain(first_function.s(), second_function.s()).delay()
,因为我需要在代码开头调用
first_function
。如果出现异常,
first_function
应该独立运行。但是,如果一切正常,我只想在确保
second_function
已完成后才调用
first_function

我担心的是:

  • 如果同时向视图发出多个请求,我希望
    second_function
    正确等待相应的
    first_function
    完成。
  • 我对如何确保
    second_function
    在与同一请求相关的特定
    first_function
    之后运行有点困惑?(注意:我不能在代码中间添加睡眠或任何阻塞代码。)

任何使用 Celery 处理这种情况的指导或最佳实践将不胜感激!

django django-rest-framework celery
1个回答
0
投票

您可以在第二个任务开始时读取第一个任务的状态,并等到第一个任务状态为“SUCCESS”。

from celery.result import AsyncResult

first_task = first_function.delay()

# some codes ....

while True:
    # Poll the first task until it's completed
    result = AsyncResult(first_task.id)
    if result.state == 'SUCCESS':
        # If the first task is successful, call the second task
        second_task = second_function.delay(result.result)
    elif result.state in ['FAILURE', 'REVOKED']:
        break
    
    # Sleep for a short while before checking again
    time.sleep(2)

如果你不想使用 sleep 那么你可以引入第三个任务来避免阻塞主线程

from celery import shared_task
from celery.result import AsyncResult

@shared_task
def first_function():
    # Your first function logic here
    pass

@shared_task
def second_function(result):
    # Your second function logic here
    # 'result' is the result of first_function
    pass

@shared_task(bind=True)
def monitor_tasks(self, first_task_id):
    # Monitor the first task
    result = AsyncResult(first_task_id)

    if result.state == 'SUCCESS':
        # If the first task is successful, call the second task
        second_function.delay(result.result)
    elif result.state in ['FAILURE', 'REVOKED']:
        # If the first task failed or was revoked, stop monitoring
        print("First task failed or was revoked. Not triggering the second task.")
    else:
        # Retry the monitoring task if the first task is not finished yet
        self.retry(countdown=2, max_retries=None)


# Start the first function asynchronously
first_task = first_function.delay()

# some codes ....

# Start monitoring task
monitor_tasks.delay(first_task.id)

© www.soinside.com 2019 - 2024. All rights reserved.