Celery关闭人员,然后重试任务

问题描述 投票:0回答:1

我需要在celery任务中实现以下逻辑:如果满足某些条件,请关闭当前工作程序并重试该任务。

在示例任务上测试:

@app.task(bind=True, max_retries=1)
def shutdown_and_retry(self, config):
    try:
        raise Exception('test exection')
    except Exception as exc:
        print('Retry {}/{}, task id {}'.format(self.request.retries, self.max_retries, self.request.id))
        app.control.shutdown(destination=[self.request.hostname])  # send shutdown signal to the current worker
        raise self.retry(exc=exc, countdown=5)
    print('Execute task id={} retries={}'.format(self.request.id, self.request.retries))
    return 'some result'

但是它给出奇怪的结果,步骤:

  1. 运行工人:celery worker -Q test_queue -A test_worker -E -c 1 -n test_worker_1
  2. 将任务推送到“ test_queue”队列。
  3. 工人抓住了它,然后关机。我在RabbitMQ的“ test_queue”中打开了任务列表,并看到:
    • 发布者提交的原始任务,重试= 0(来自app.control.shutdown()调用);
    • 原始任务的副本(具有相同的ID),重试= 1(来自self.retry()调用。)>
  4. 然后我将另一个工作程序启动到同一队列,它捕获了该任务并关闭了。但是在Broker上,原始任务又有一个副本以相同的ID和重试= 1推送到队列中。因此,我在队列中有3个任务。所有下一次运行的工人都给+ 1个新任务排队。在这种情况下,条件max_retries = 1无效。
  5. 我尝试过的:

  1. 在celeryconfig.py中设置task_reject_on_worker_lost = True并运行相同的任务。结果:什么都没有改变。
  2. 仅将关闭调用留在工作者的任务中。结果:每次尝试都只推回原始任务(没有重复的任务),但不计入重试次数(始终设置为0);
  3. 在关机之前重新添加app.control.revoke(self.request.id),然后在工作进程中重试调用(基于this)。结果:第一次尝试后得到了相同的结果(队列中有2个任务),但是当我运行第二个工作队列时,它却没有运行。因此,该任务将丢失并且不会重试。
  4. 有没有一种方法可以在app.control.shutdown()调用期间不将原始任务推回队列?看来这是根本原因。或者,您是否可以建议其他解决方法,以实现上述正确的逻辑。

设置:RabbitMQ 3.8.2,芹菜4.1.0,python 3.5.4

celeryconfig.py中的设置:

task_acks_late = True
task_acks_on_failure_or_timeout = True
task_reject_on_worker_lost = False
task_track_started = True
worker_prefetch_multiplier = 1
worker_disable_rate_limits = True

我需要在celery任务中实现以下逻辑:如果满足某些条件,请关闭当前工作程序并重试该任务。在示例任务上进行了测试:@ app.task(bind = True,max_retries = 1)def ...

python rabbitmq celery
1个回答
0
投票

似乎配置文件中的问题是task_acks_late。通过使用它,您说的是“仅在完成运行后才从队列中删除任务”。然后,您杀死了该工作程序,因此它从未被确认(并且您得到了该任务的重复项)。

© www.soinside.com 2019 - 2024. All rights reserved.