我正在尝试创建一个 Python 脚本,该脚本等待 RabbitMQ 队列上的消息以在子进程中启动任务。在任务执行期间,脚本继续使用另一个队列来取消订单,这将停止任务。
我使用
kombu
包来处理与 RabbitMQ 的交互。当任务终止时(无论是正常终止还是因为取消),我会调用 message.ack()
。
尽管调用了
message.ack()
,但启动消息并未从队列中删除(我可以通过使用rabbitmqctl
来判断)。即使任务已完成,这也会导致消息被重新传递。
我创建了一个示例存储库来显示问题。自述文件显示了复制步骤。
我不知道问题出在哪里。我意识到有很多移动部分,但这是一个实际项目的稍微简化的版本,有自己的限制(比如 Python 版本被固定为 3.8)。我愿意接受任何更好地使用
kombu
或 asyncio
的建议,因为我对 AMQP 和异步 Python 还很陌生。
我终于找到问题的原因了。其实有两个原因:
noAck=True
,因此调用message.ack()
没有效果。奇怪的是,noAck=True
应该导致在接收时发送ACK,即使对于开始消息也是如此,但事实并非如此。on_done
回调的关闭。我认为闭包在创建时包含 message
,但事实并非如此。相反,它包含收到的最后一条消息,即“停止”消息。我对 Python 很陌生,所以我不确定闭包是如何工作的。我以为它像 JavaScript 一样工作,但似乎不是。总的来说,我在创建任务时存储了该消息,并将其作为参数返回给 on_done
回调,并且它起作用了。直到我使用调试器一步步检查执行情况后才找到解决方案。这再次显示了调试器在故障排除时的优越性。我应该早点走的。