我使用 celery 2.4.1 和 python 2.6、rabbitmq 后端和 django。我希望如果工作人员关闭,我的任务能够正确清理。据我所知,您无法提供任务析构函数,因此我尝试连接到 worker_shutdown 信号。
注意:AbortableTask仅适用于数据库后端,所以我不能使用它。
from celery.signals import worker_shutdown
@task
def mytask(*args)
obj = DoStuff()
def shutdown_hook(*args):
print "Worker shutting down"
# cleanup nicely
obj.stop()
worker_shutdown.connect(shutdown_hook)
# blocking call that monitors a network connection
obj.stuff()
但是,关闭挂钩永远不会被调用。 Ctrl-C'ing 工作线程不会终止任务,我必须从 shell 手动终止它。
因此,如果这不是正确的方法,我该如何允许任务正常关闭?
worker_shutdown
仅由 MainProcess
发送,而不是子池工作人员发送。
所有worker_*
信号except for worker_process_init
,请参阅MainProcess
。
但是,关闭挂钩永远不会被调用。 Ctrl-C 操作工人 不会终止任务,我必须从 shell 手动终止它。
工作线程在正常(热)关闭下永远不会终止任务。 即使任务需要几天才能完成,工作人员也不会完成关闭 直到完成。您可以将
--soft-time-limit
或 --time-limit
设置为
告诉实例何时可以终止任务。
因此,要添加任何类型的进程清理过程,您首先需要 确保任务能够真正完成。因为清理不会 在此之前被调用。
要向池工作进程添加清理步骤,您可以使用 像这样的东西:
from celery import platforms
from celery.signals import worker_process_init
def cleanup_after_tasks(signum, frame):
# reentrant code here (see http://docs.python.org/library/signal.html)
def install_pool_process_sighandlers(**kwargs):
platforms.signals["TERM"] = cleanup_after_tasks
platforms.signals["INT"] = cleanup_after_tasks
worker_process_init.connect(install_pool_process_sighandlers)
使用worker_shutting_down信号。 有关示例,请参阅 https://docs.celeryq.dev/en/stable/userguide/signals.html#worker-shutting-down 和 https://stackoverflow.com/a/55481656/237091。