我有这个示例代码:
@router.post('/test')
def Test():
for i in range(0,10):
Testy.apply_async([i],priority=7)
return "done"
@app.task(bind=True)
def Testy(self,i):
try:
testtest(i)
return "Fine"
except Exception as e:
raise self.retry(countdown=0, priority=2)
def testtest(i):
try:
print(i)
time.sleep(2)
if random.randint(1,5) == 3:
raise ZeroDivisionError
except:
print("ERROR")
raise
在这段代码中我随机提出了一个异常。 我的正常任务优先级是 7,但我重试的任务优先级是 2(根据 celery 文档,该优先级更高) 所以我希望重试的任务比剩余的任务运行得更快
我期望这个输出:
0
1
2
3
ERROR
3
4
5
6
7
ERROR
7
8
9
但是我得到了这个:
1
2
3
ERROR
4
5
6
7
ERROR
8
9
3
7
这意味着重试的任务即使具有更高的优先级也会进入队列末尾
priority = Testy.request.delivery_info.get('priority')
print(priority)
它正确地为我打印了 7 和 2
Testy.apply_async([i],priority=2)
我尝试使用另一个具有更高优先级的任务(具有相同的队列),因此除此之外,它会转到比其他任务更快的位置。它也不起作用
我尝试了这个设置:
MAX_TASK_PRIORITY = 10
DEFAULT_TASK_PRIORITY = 7
worker_prefetch_multiplier = 1
task_acks_late = True
我用
celery==5.3.0
和rabbitmq==3.12.7
第一个问题是对如何确定优先级的误解。 根据 Celery 文档,优先级编号较低的任务被赋予较高的优先级,导致优先级为 2 的任务比其他任务更早执行。然而,即使纠正了这个误解,问题仍然存在。因此,我决定将 Celery 后端从 RabbitMQ 切换到 Redis,最终解决了问题。
我仍然不确定为什么 RabbitMQ 无法正确确定任务优先级,但 Redis 工作正常且正确。