用 Celery 取消已经执行的任务?

问题描述 投票:0回答:9

我一直在阅读文档并进行搜索,但似乎找不到直接的答案:

可以取消已经执行的任务吗? (如任务已经开始,需要一段时间,进行到一半需要取消)

我从Celery FAQ

的文档中找到了这个
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

但我不清楚这是否会取消排队的任务,或者是否会杀死工作进程上正在运行的进程。感谢您提供的任何光芒!

python django celery message-passing
9个回答
240
投票

revoke 取消任务执行。如果任务被撤销,工作人员将忽略该任务并且不执行它。如果您不使用持久撤销,您的任务可以在工作人员重新启动后执行。

https://docs.celeryq.dev/en/stable/userguide/workers.html#worker-persistent-revokes

revoke 有一个终止选项,默认情况下为 False。如果您需要终止正在执行的任务,您需要将终止设置为True

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks


59
投票

在 Celery 3.1 中,撤销任务的 API 发生了变化。

根据 Celery FAQ,您应该使用 result.revoke:

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

或者如果您只有任务 ID:

>>> from proj.celery import app
>>> app.control.revoke(task_id)

38
投票

@0x00mh 的答案是正确的,但是最近的 celery docs 说使用

terminate
选项是“管理员的最后手段”,因为您可能会意外终止同时开始执行的另一个任务。可能更好的解决方案是将
terminate=True
signal='SIGUSR1'
结合起来(这会导致在任务中引发 SoftTimeLimitExceeded 异常)。


6
投票

根据 5.2.3 文档,可以运行以下命令:

    celery.control.revoke(task_id, terminate=True, signal='SIGKILL')

哪里

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) 

文档链接:https://docs.celeryq.dev/en/stable/reference/celery.app.control.html?highlight=revoke#celery.app.control.Control.revoke


6
投票

您可以使用代理和后端定义 celery 应用程序,例如:

from celery import Celery
celeryapp = Celery('app', broker=redis_uri, backend=redis_uri)

当您运行发送任务时,它会返回任务的唯一 ID:

task_id = celeryapp.send_task('run.send_email', queue = "demo")

要撤销任务,您需要 celery 应用程序和任务 ID:

celeryapp.control.revoke(task_id, terminate=True)

4
投票

另外,不尽如人意的是,还有另一种方式(abort task)来停止任务,但也有很多不可靠的地方,更多详情,请参阅: http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html


2
投票
from celery.app import default_app

revoked = default_app.control.revoke(task_id, terminated=True, signal='SIGKILL')
print(revoked)

1
投票

请参阅以下任务选项:time_limitsoft_time_limit(或者您可以为工作人员设置)。如果您不仅想控制执行时间,请参阅 apply_async 方法的 expires 参数。


0
投票
from celery.result import AsyncResult
task = AsyncResult(task_id)
task.revoke()
© www.soinside.com 2019 - 2024. All rights reserved.