我正在开发一个 Flask 应用程序,使用 Celery 和 RabbitMQ 作为消息代理。 我想在开始处理之前检查任务是否已成功添加到 RabbitMQ 队列中。
目前,我正在使用 AsyncResult(task_id) 来检查任务的状态。但当我检查从未创建的任务 ID 时,它会响应“待处理”。
是否有办法使用 Flask 中的 Celery 检查任务是否已添加到 RabbitMQ 的队列中? 有没有人遇到过这个问题,如果是的话,处理它的最佳方法是什么? 谢谢!
您可以在 Celery 中启用task_send_sent_event Celery 提供了 task_send_sent_event 设置,当任务成功发送到代理时,该设置会发出一个事件。然后使用像Flower这样的工具。
或者您可以使用 pika 直接检查 RabbitMQ 队列。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
queue_name = 'your_queue_name'
# Check the queue status
queue_status = channel.queue_declare(queue=queue_name, passive=True)
print(f"Messages in queue: {queue_status.method.message_count}")
connection.close()
如果任务添加成功,将会反映在队列的消息计数中。