我在 celery 中有一个任务只能运行一次,因为它每次运行时都需要从不同的文件夹导入我的模块和子模块的新副本。 Celery 处理得不好,它会保留上一次运行的模块。似乎唯一的解决方案是在任务完成后关闭 worker。
然而,当试图将其关闭时,消息又回到了队列中。
我试过了
注:
我正在使用 rabbitmq 作为消息代理。这是一项长时间运行的任务(数小时),我们只希望在每个工作人员上一次运行一个任务。没有预取,所以并发设置为 1 并且 ack_late = True。
您可以使用 Redis 之类的东西甚至数据库来跟踪您的任务是否已完成。只需为每个任务存储一个唯一的 ID 和一个标志来说明它是否已完成。
任务代码中可以查看存储是否完成。如果是,则退出而不做任何事情。如果没有,请执行任务,然后更新存储以表明它已完成。
您需要在任务完成后手动确认任务。这会阻止它返回队列。您可以在任务代码中使用类似 message.ack() 的东西,其中 message 是任务消息对象。
因此,使用此设置,即使 worker 关闭并且任务返回队列,存储也会显示它已完成并且任务不会再次运行。希望这有帮助!