我一直在阅读文档并进行搜索,但似乎找不到直接的答案:
可以取消已经执行的任务吗? (如任务已经开始,需要一段时间,进行到一半需要取消)
的文档中找到了这个>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
但我不清楚这是否会取消排队的任务,或者是否会杀死工作进程上正在运行的进程。感谢您提供的任何光芒!
revoke 取消任务执行。如果任务被撤销,工作人员将忽略该任务并且不执行它。如果您不使用持久撤销,您的任务可以在工作人员重新启动后执行。
https://docs.celeryq.dev/en/stable/userguide/workers.html#worker-persistent-revokes
revoke 有一个终止选项,默认情况下为 False。如果您需要终止正在执行的任务,您需要将终止设置为True。
>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)
https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks
在 Celery 3.1 中,撤销任务的 API 发生了变化。
根据 Celery FAQ,您应该使用 result.revoke:
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
或者如果您只有任务 ID:
>>> from proj.celery import app
>>> app.control.revoke(task_id)
@0x00mh 的答案是正确的,但是最近的 celery docs 说使用
terminate
选项是“管理员的最后手段”,因为您可能会意外终止同时开始执行的另一个任务。可能更好的解决方案是将 terminate=True
与 signal='SIGUSR1'
结合起来(这会导致在任务中引发 SoftTimeLimitExceeded 异常)。
根据 5.2.3 文档,可以运行以下命令:
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
哪里
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
您可以使用代理和后端定义 celery 应用程序,例如:
from celery import Celery
celeryapp = Celery('app', broker=redis_uri, backend=redis_uri)
当您运行发送任务时,它会返回任务的唯一 ID:
task_id = celeryapp.send_task('run.send_email', queue = "demo")
要撤销任务,您需要 celery 应用程序和任务 ID:
celeryapp.control.revoke(task_id, terminate=True)
另外,不尽如人意的是,还有另一种方式(abort task)来停止任务,但也有很多不可靠的地方,更多详情,请参阅: http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html
from celery.app import default_app
revoked = default_app.control.revoke(task_id, terminated=True, signal='SIGKILL')
print(revoked)
请参阅以下任务选项:time_limit、soft_time_limit(或者您可以为工作人员设置)。如果您不仅想控制执行时间,请参阅 apply_async 方法的 expires 参数。
from celery.result import AsyncResult
task = AsyncResult(task_id)
task.revoke()