celery apply_async 的回调

问题描述 投票:0回答:2

我在应用程序中使用

celery
来运行定期任务。让我们看下面的简单例子

from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise

正如您在上面的示例中看到的,我使用

celery
来运行异步任务,但是(因为它是一个队列)我需要执行
queue.fail(uid)
以防
do_stuff
queue.ack(uid)
出现异常。在这种情况下,在这两种情况下从我的任务中进行一些回调将是非常清晰和有用的 -
on_failure
on_success

我看过一些文档,但从未见过将回调与

apply_async
一起使用的实践。可以这样做吗?

python callback celery
2个回答
51
投票

子类化 Task 类并重载 on_success 和 on_failure 函数:

from celery import Task


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        '''
        retval – The return value of the task.
        task_id – Unique id of the executed task.
        args – Original arguments for the executed task.
        kwargs – Original keyword arguments for the executed task.
        '''
        pass
        
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        '''
        exc – The exception raised by the task.
        task_id – Unique id of the failed task.
        args – Original arguments for the task that failed.
        kwargs – Original keyword arguments for the task that failed.
        '''
        pass

用途:

@celery.task(base=CallbackTask)  # this does the trick
def add(x, y):
    return x + y

9
投票

调用 apply_async 时,可以通过 link 和 link_err kwargs 指定成功和错误回调。 celery 文档包含一个清晰的示例:http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks

© www.soinside.com 2019 - 2024. All rights reserved.