我正在Celery中设置一个任务,让它从某个主题交换中“消费”。当我将消息发送到有问题的交换机时,我收到错误:“收到并删除了一条未知消息。错误的目的地?!?”在芹菜控制台上。
我已经创建了一个单独的项目文件夹来复制问题,其中所有内容都称为test-something,具有以下结构:
celery-test/
L celery.py
L celeryconfig.py
L tasks.py
我已经看到了各种StackOverflow问题以及引用librabbitmq包的GitHub问题。这里的解决方案是卸载这个软件包,但我甚至没有安装它,所以这让我无处可去。发现的一些问题/问题提出了这个解决方案: - https://github.com/celery/celery/issues/3675 - Celery &Rabbitmq:WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?- a experiment on the GIT
我也试过玩任务路由设置,因为我认为问题就在那里,但我无法让它工作。
对于任何想知道为什么端口被1关闭的人来说,那是因为它指向我的docker容器中的rabbitmq,它不能再使用5672了。
celery.朋友
app = Celery('celery_test', include=['celery_test.tasks'])
app.config_from_object('celery_test.celeryconfig')
celery config.朋友
broker_url = 'amqp://guest:guest@localhost:5673//'
result_backend = 'rpc://'
default_exchange = Exchange('default', type='direct')
test_exchange = Exchange('test_exchange', type='topic')
task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('test_queue', test_exchange, routing_key='test123test')
)
task_routes = {
'celery_test.tasks.test_method': {
'queue': 'test_queue'
}
}
tasks.朋友
@app.task
def test_method():
print('test_method')
return 'test_method'
然后我用来发送消息的文件:send.py
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5673/'))
channel = connection.channel()
exchange = 'test_exchange'
routing_key = 'test123test'
message = 'Testmessage'
channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)
connection.close()
这可能不是一个真正的答案,而是更多的后续行动。但我想我会让人们知道谁遇到了这个问题。 (这篇文章是我的全部解释,因为我是芹菜的新手,你应该带着一粒盐。)
所以基本上我认为这种情况发生的原因是因为Celery不理解这个消息。 Celery需要大量的标题和其他属性才能理解messesage正在尝试做什么。
可以模拟这些标题进行逆向工程,但我不打算通过它,因为有更简单的方法来解决我计划制作的应用程序。
如果有人在阅读这篇文章时有更多关于这个主题的经验,请随时纠正我。