我在这个问题上花了很长时间,但我仍然不知道是什么问题。我试图将不同的任务路由到他们自己的特定队列。在RabbitMQ中,我有默认的celery
队列和另一个名为test_queue
的队列。我可以很好地将任务推送到celery
队列中,但是我很难弄清楚如何将任务推送到test_queue
队列中。我已经阅读了芹菜设置task_default_queue
,task_queue
和task_routes
的文档。
celery.py
from __future__ import absolute_import, unicode_literals
from celery import Celery
app = Celery('celery_test',
broker_url='amqp://',
backend='amqp://',
include=['celery_test.tasks'],
worker_max_tasks_per_child=1,
task_create_missing_queues=True,
#task_queues = {'test_queue': {'exchange': 'test_queue', 'routing_key': 'test_queue'}},
#task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
#task_default_queue='test_queue'
)
# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600,)
app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
if __name__ == '__main__':
app.start()
[我注意到,当我在test_queue
构造函数中指定Celery
队列名称时,我的任务没有正确地路由到正确的RabbitMQ队列。相反,我不得不使用app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
完成工作。我知道这看似微不足道,但是我已经花了好几个小时不停地努力,我不知道为什么一种方法行得通,而另一种方法行不通。
我正在使用
据我所知,Celery 4.x(类)构造函数未采用您提到的那些参数(task_queues,task_routes和task_default_queue)。