从多个队列中消费的 Celery Worker 如何决定先消费哪一个?

问题描述 投票:0回答:4

我正在使用 Celery 执行异步后台任务,以 Redis 作为后端。我对 Celery 工人在以下情况下的行为感兴趣:

我正在使用

celeryd
将工作程序作为守护进程运行。该工作人员已被分配两个队列以通过
-Q
选项进行消费:

celeryd -E -Q queue1,queue2

worker 如何决定从哪里获取下一个任务来消费? 它是否随机消费来自

queue1
queue2
的任务?它是否会优先从
queue1
获取,因为它位于传递给
-Q
的参数列表中的第一个?

scheduled-tasks celery celeryd
4个回答
20
投票

根据我的测试,它处理多个队列循环方式

如果我使用这个测试代码:

from celery import task
import time


@task
def my_task(item_id):
    time.sleep(0.5)
    print('Processing item "%s"...' % item_id)


def add_items_to_queue(queue_name, items_count):
    for i in xrange(0, items_count):
        my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)


add_items_to_queue('queue1', 10)
add_items_to_queue('queue2', 10)
add_items_to_queue('queue3', 5)

并启动队列(使用 django-celery):

`manage.py celery worker -Q queue1,queue2,queue3`

它输出:

Processing item "queue1-0"...
Processing item "queue3-0"...
Processing item "queue2-0"...
Processing item "queue1-1"...
Processing item "queue3-1"...
Processing item "queue2-1"...
Processing item "queue1-2"...
Processing item "queue3-2"...
Processing item "queue2-2"...
Processing item "queue1-3"...
Processing item "queue3-3"...
Processing item "queue2-3"...
Processing item "queue1-4"...
Processing item "queue3-4"...
Processing item "queue2-4"...
Processing item "queue1-5"...
Processing item "queue2-5"...
Processing item "queue1-6"...
Processing item "queue2-6"...
Processing item "queue1-7"...
Processing item "queue2-7"...
Processing item "queue1-8"...
Processing item "queue2-8"...
Processing item "queue1-9"...
Processing item "queue2-9"...

因此,它会在继续处理下一个队列 1 项目之前从每个队列中提取一个项目,即使所有队列 1 任务都在队列 2 和 3 任务之前发布

注意: 正如 @WarLord 指出的,只有当

CELERYD_PREFETCH_MULTIPLIER
设置为 1 时,此行为才会起作用。如果它大于 1,则意味着将从队列中分批获取项目。因此,如果您有 4 个进程,且 PREFETCH_MULTIPLIER 设置为 4,则意味着将立即从队列中提取 16 个项目,因此您不会获得如上所述的确切输出,但它仍然会大致跟随循环-罗宾。


9
投票

注意: 这个答案已被弃用:最新版本的 Celery 的工作方式与 2013 年的非常不同......

消耗多个队列的工作线程也消耗任务,跨多个队列也维持 FIFO 顺序。

示例:

队列1:(t1,t2,t5,t7)

队列2:(t0,t3,t4,t6)

假设0-7代表发布任务的顺序

消费顺序为 t0, t1, t2, t3, t4, t5, t6, t7


3
投票
使用指向rabbitmq服务器的pyamqp代理库,任务以循环方式处理。请参阅下面的证明。

看来处理的订单任务是由代理

决定的,而不是实际的后端(rabbitmq vs redis不是问题)。

软件版本:

$ pip freeze | egrep "celery|kombu|amqp" amqp==2.5.2 celery==4.4.2 kombu==4.6.8
from time import sleep

@app.task
def sleepy(name):
    print(f"Processing: {name}")
    sleep(0.5)
然后在另一个 shell 中,将任务排队:

from time import sleep def queue_them(): for x in range(50): sleepy.apply_async(args=(f"Q1-T{x}",), queue="Q1") sleep(0.1) for x in range(20): sleepy.apply_async(args=(f"Q2-T{x}",), queue="Q2") sleep(0.1) sleepy.apply_async(args=("Q3-T0",), queue="Q3") for x in range(30): sleepy.apply_async(args=(f"Q2MOAR-T{x}",), queue="Q2") # setup - get celery to setup the queues and exchanges sleepy.apply_async(args=("nothing",), queue="Q1") sleepy.apply_async(args=("nothing",), queue="Q2") sleepy.apply_async(args=("nothing",), queue="Q3") # run the test queue_them()
在另一个 shell 中,运行 celery:

$ celery worker -A myapp.celery --pool=prefork --concurrency=2 -Ofair --queues=Q1,Q3,Q2 [2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T1 [2020-05-05 21:59:11,547] WARNING [celery.redirected:235] Processing: Q1-T0 [2020-05-05 21:59:12,052] WARNING [celery.redirected:235] Processing: Q1-T2 [2020-05-05 21:59:12,053] WARNING [celery.redirected:235] Processing: Q1-T3 [2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T5 [2020-05-05 21:59:12,556] WARNING [celery.redirected:235] Processing: Q1-T4 [2020-05-05 21:59:13,062] WARNING [celery.redirected:235] Processing: Q1-T6 [2020-05-05 21:59:13,063] WARNING [celery.redirected:235] Processing: Q1-T7 [2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T9 [2020-05-05 21:59:13,565] WARNING [celery.redirected:235] Processing: Q1-T8 [2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q1-T10 [2020-05-05 21:59:14,069] WARNING [celery.redirected:235] Processing: Q3-T0 [2020-05-05 21:59:14,571] WARNING [celery.redirected:235] Processing: Q2-T0 [2020-05-05 21:59:14,572] WARNING [celery.redirected:235] Processing: Q2-T1 [2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q1-T11 [2020-05-05 21:59:15,078] WARNING [celery.redirected:235] Processing: Q2-T2 [2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q2-T3 [2020-05-05 21:59:15,581] WARNING [celery.redirected:235] Processing: Q1-T12 [2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q1-T13 [2020-05-05 21:59:16,084] WARNING [celery.redirected:235] Processing: Q2-T4 [2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q1-T14 [2020-05-05 21:59:16,586] WARNING [celery.redirected:235] Processing: Q2-T5 [2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q1-T15 [2020-05-05 21:59:17,089] WARNING [celery.redirected:235] Processing: Q2-T6 [2020-05-05 21:59:17,591] WARNING [celery.redirected:235] Processing: Q1-T16 [2020-05-05 21:59:17,592] WARNING [celery.redirected:235] Processing: Q2-T7 [2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q1-T17 [2020-05-05 21:59:18,094] WARNING [celery.redirected:235] Processing: Q2-T8 [2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q1-T18 [2020-05-05 21:59:18,597] WARNING [celery.redirected:235] Processing: Q2-T9 [2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T19 [2020-05-05 21:59:19,102] WARNING [celery.redirected:235] Processing: Q1-T20 [2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T21 [2020-05-05 21:59:19,607] WARNING [celery.redirected:235] Processing: Q1-T22 [2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q1-T23 [2020-05-05 21:59:20,110] WARNING [celery.redirected:235] Processing: Q2-T10 [2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q1-T24 [2020-05-05 21:59:20,614] WARNING [celery.redirected:235] Processing: Q2-T11 [2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T25 [2020-05-05 21:59:21,118] WARNING [celery.redirected:235] Processing: Q1-T26 [2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q2-T12 [2020-05-05 21:59:21,622] WARNING [celery.redirected:235] Processing: Q1-T27 [2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q1-T28 [2020-05-05 21:59:22,124] WARNING [celery.redirected:235] Processing: Q2-T13 [2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q2-T14 [2020-05-05 21:59:22,627] WARNING [celery.redirected:235] Processing: Q1-T29 [2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T31 [2020-05-05 21:59:23,129] WARNING [celery.redirected:235] Processing: Q1-T30 [2020-05-05 21:59:23,631] WARNING [celery.redirected:235] Processing: Q2-T15 [2020-05-05 21:59:23,632] WARNING [celery.redirected:235] Processing: Q1-T32 [2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q1-T33 [2020-05-05 21:59:24,134] WARNING [celery.redirected:235] Processing: Q2-T16 [2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T17 [2020-05-05 21:59:24,636] WARNING [celery.redirected:235] Processing: Q2-T18 [2020-05-05 21:59:25,138] WARNING [celery.redirected:235] Processing: Q2-T19 [2020-05-05 21:59:25,139] WARNING [celery.redirected:235] Processing: Q1-T34 [2020-05-05 21:59:25,641] WARNING [celery.redirected:235] Processing: Q1-T35 [2020-05-05 21:59:25,642] WARNING [celery.redirected:235] Processing: Q2MOAR-T0 [2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T36 [2020-05-05 21:59:26,144] WARNING [celery.redirected:235] Processing: Q1-T37 [2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q2MOAR-T1 [2020-05-05 21:59:26,649] WARNING [celery.redirected:235] Processing: Q1-T38 [2020-05-05 21:59:27,153] WARNING [celery.redirected:235] Processing: Q2MOAR-T2 [2020-05-05 21:59:27,154] WARNING [celery.redirected:235] Processing: Q1-T39 [2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T3 [2020-05-05 21:59:27,656] WARNING [celery.redirected:235] Processing: Q2MOAR-T4 [2020-05-05 21:59:28,159] WARNING [celery.redirected:235] Processing: Q2MOAR-T5 [2020-05-05 21:59:28,160] WARNING [celery.redirected:235] Processing: Q1-T40 [2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q2MOAR-T6 [2020-05-05 21:59:28,664] WARNING [celery.redirected:235] Processing: Q1-T41 [2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q2MOAR-T7 [2020-05-05 21:59:29,167] WARNING [celery.redirected:235] Processing: Q1-T42
当 celery 以 1 并发运行时,结果类似:

[2020-05-05 22:01:33,879] WARNING [celery.redirected:235] Processing: Q1-T0 [2020-05-05 22:01:34,385] WARNING [celery.redirected:235] Processing: Q1-T1 [2020-05-05 22:01:34,888] WARNING [celery.redirected:235] Processing: Q1-T2 [2020-05-05 22:01:35,391] WARNING [celery.redirected:235] Processing: Q1-T3 [2020-05-05 22:01:35,894] WARNING [celery.redirected:235] Processing: Q1-T4 [2020-05-05 22:01:36,397] WARNING [celery.redirected:235] Processing: Q1-T5 [2020-05-05 22:01:36,899] WARNING [celery.redirected:235] Processing: Q3-T0 [2020-05-05 22:01:37,404] WARNING [celery.redirected:235] Processing: Q2-T0 [2020-05-05 22:01:37,907] WARNING [celery.redirected:235] Processing: Q2-T1 [2020-05-05 22:01:38,411] WARNING [celery.redirected:235] Processing: Q1-T6 [2020-05-05 22:01:38,913] WARNING [celery.redirected:235] Processing: Q2-T2 [2020-05-05 22:01:39,417] WARNING [celery.redirected:235] Processing: Q2-T3 [2020-05-05 22:01:39,919] WARNING [celery.redirected:235] Processing: Q2-T4 [2020-05-05 22:01:40,422] WARNING [celery.redirected:235] Processing: Q1-T7 [2020-05-05 22:01:40,925] WARNING [celery.redirected:235] Processing: Q2-T5 [2020-05-05 22:01:41,429] WARNING [celery.redirected:235] Processing: Q1-T8
    

0
投票
从 Celery 5.3 开始,使用 Redis 传输时,队列优先级

在某种程度上是可配置的。

首先可以通过更改

队列顺序策略来调整,这是Redis特定的代理传输选项

它的默认值是

round_robin

,旨在为每个队列提供平等的消费机会。另一个可用值是 
priority
。根据
文档,它将:

按原始顺序从队列中消费,这样如果第一个队列始终包含消息,则列表中的其余队列将永远不会被消费。

更新配置后:

celery_app.conf.update(broker_transport_options={"queue_order_strategy": "priority"})
第二步有必要以正确的队列顺序启动工作线程,其中应首先列出优先级较高的队列。

celery worker -Q higher_priority_queue,other_queue
据我了解,当高优先级队列中有新任务时,此设置不会停止低优先级队列中当前正在运行的任务。它将等待它完成。但随后它会再次检查优先级较高的队列。如果它包含一个新任务,它将运行它

不管其他较低优先级队列中是否有更多任务。

© www.soinside.com 2019 - 2024. All rights reserved.