我们有一些运行时间非常长的 celery 任务(不幸的是,我们不能将其分割成子任务),但杀死中间任务是完全安全的。
当我们需要部署新代码时,这很糟糕,因为当我们需要退回所有工作人员(如果其中一个正在运行)时,可能需要半小时而不是 30 秒。
有没有办法从任务内部找出它的工作人员是否正在等待它,以便它可以终止/重新启动/无论什么,以便这些可以安全杀死的任务可以定期检查并自行终止?
这是我发现的一种方法(适用于 celery 5.3.6):
import celery
from your_app.celery import app
def my_task_is_still_active():
task_id = celery.current_task.request.id
active_task_ids = {
task["id"]
for tasks in (app.control.inspect().active() or {}).values()
for task in tasks
}
return task_id in active_task_ids
current_task
和app.control.inspect()
的文档。