芹菜优先不适用

问题描述 投票:0回答:1

我有这个示例代码:

@router.post('/test')
def Test():
    for i in range(0,10):
        Testy.apply_async([i],priority=7)
    return "done"

@app.task(bind=True)
def Testy(self,i):
    try:
        testtest(i)
        return "Fine"     
    except Exception as e:
        raise self.retry(countdown=0, priority=2)
def testtest(i):
    try:
        print(i)
        time.sleep(2)
        if random.randint(1,5) == 3:
            raise ZeroDivisionError
    except:
        print("ERROR")
        raise

在这段代码中我随机提出了一个异常。 我的正常任务优先级是 7,但我重试的任务优先级是 2(根据 celery 文档,该优先级更高) 所以我希望重试的任务比剩余的任务运行得更快

我期望这个输出:

0
1
2
3
ERROR
3
4
5
6
7
ERROR
7
8
9

但是我得到了这个:

1
2
3
ERROR
4
5
6
7
ERROR
8
9
3
7

这意味着重试的任务即使具有更高的优先级也会进入队列末尾

  • 我打印优先级以确保它具有正确的值
        priority = Testy.request.delivery_info.get('priority')
        print(priority)

它正确地为我打印了 7 和 2

  • 我也使用了这个而不是使用self和bind,但没有任何改变:
Testy.apply_async([i],priority=2)
  • 我尝试使用另一个具有更高优先级的任务(具有相同的队列),因此除此之外,它会转到比其他任务更快的位置。它也不起作用

  • 我尝试了这个设置:

MAX_TASK_PRIORITY = 10
DEFAULT_TASK_PRIORITY = 7
worker_prefetch_multiplier = 1
task_acks_late = True

我用

celery==5.3.0
rabbitmq==3.12.7

python celery priority-queue
1个回答
0
投票

第一个问题是对如何确定优先级的误解。 根据 Celery 文档,优先级编号较低的任务被赋予较高的优先级,导致优先级为 2 的任务比其他任务更早执行。然而,即使纠正了这个误解,问题仍然存在。因此,我决定将 Celery 后端从 RabbitMQ 切换到 Redis,最终解决了问题。

我仍然不确定为什么 RabbitMQ 无法正确确定任务优先级,但 Redis 工作正常且正确。

© www.soinside.com 2019 - 2024. All rights reserved.