已确认的 kombu 消息未从 RabbitMQ 队列中删除

问题描述 投票:0回答:1

我正在尝试创建一个 Python 脚本,该脚本等待 RabbitMQ 队列上的消息以在子进程中启动任务。在任务执行期间,脚本继续使用另一个队列来取消订单,这将停止任务。

我使用

kombu
包来处理与 RabbitMQ 的交互。当任务终止时(无论是正常终止还是因为取消),我会调用
message.ack()

尽管调用了

message.ack()
,但启动消息并未从队列中删除(我可以通过使用
rabbitmqctl
来判断)。即使任务已完成,这也会导致消息被重新传递。

我创建了一个示例存储库来显示问题。自述文件显示了复制步骤。

我不知道问题出在哪里。我意识到有很多移动部分,但这是一个实际项目的稍微简化的版本,有自己的限制(比如 Python 版本被固定为 3.8)。我愿意接受任何更好地使用

kombu
asyncio
的建议,因为我对 AMQP 和异步 Python 还很陌生。

python rabbitmq python-asyncio kombu
1个回答
0
投票

我终于找到问题的原因了。其实有两个原因:

  1. 我在创建消费者时没有设置频道,因此它们都使用相同的频道。由于我为“停止”消费者设置了
    noAck=True
    ,因此调用
    message.ack()
    没有效果。奇怪的是,
    noAck=True
    应该导致在接收时发送ACK,即使对于开始消息也是如此,但事实并非如此。
  2. 我错误地理解了
    on_done
    回调的关闭。我认为闭包在创建时包含
    message
    ,但事实并非如此。相反,它包含收到的最后一条消息,即“停止”消息。我对 Python 很陌生,所以我不确定闭包是如何工作的。我以为它像 JavaScript 一样工作,但似乎不是。总的来说,我在创建任务时存储了该消息,并将其作为参数返回给
    on_done
    回调,并且它起作用了。

直到我使用调试器一步步检查执行情况后才找到解决方案。这再次显示了调试器在故障排除时的优越性。我应该早点走的。

© www.soinside.com 2019 - 2024. All rights reserved.