我需要在celery任务中实现以下逻辑:如果满足某些条件,请关闭当前工作程序并重试该任务。
在示例任务上测试:
@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
try:
raise Exception('test exection')
except Exception as exc:
print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
app.control.shutdown(destination=[self.request.hostname]) # send shutdown signal to the current worker
raise self.retry(exc=exc, countdown=5)
print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
return 'some result'
但是它给出奇怪的结果,步骤:
celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1
。我尝试过的:
task_reject_on_worker_lost = True
并运行相同的任务。结果:什么都没有改变。app.control.revoke(self.request.id)
,然后在工作进程中重试调用(基于this)。结果:第一次尝试后得到了相同的结果(队列中有2个任务),但是当我运行第二个工作队列时,它却没有运行。因此,该任务将丢失并且不会重试。有没有一种方法可以在app.control.shutdown()
调用期间不将原始任务推回队列?看来这是根本原因。或者,您是否可以建议其他解决方法,以实现上述正确的逻辑。
设置:RabbitMQ 3.8.2,芹菜4.1.0,python 3.5.4
celeryconfig.py中的设置:
task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True
我需要在celery任务中实现以下逻辑:如果满足某些条件,请关闭当前工作程序并重试该任务。在示例任务上进行了测试:@ app.task(bind = True,max_retries = 1)def ...
似乎配置文件中的问题是task_acks_late
。通过使用它,您说的是“仅在完成运行后才从队列中删除任务”。然后,您杀死了该工作程序,因此它从未被确认(并且您得到了该任务的重复项)。