用例是使用联合交换将所有 celery 消息联合到第二个rabbitmq节点以进行备份(以防发生灾难)。
我已经在2个rabbitmq节点之间建立了联合交换。我的 django 应用程序 (https://medium.com/@techWithAditya/building-scalable-applications-with-django-celery-and-rabbitmq-a-step-by-step-guide-fc58bccc8cad) 连接到节点 1 (在此设置中是上游)。现在,当我发布任务时,它会在上游排队,但不会联合,但是当我手动发布到 celery Exchange(通过独立发布者)时,它会联合。
仅当消息通过 celery 发布时才会出现问题
当我直接在 celery 交易所发布时,联合工作正常。它还可以使用用 kombu 编写的独立 mq 生成器
使用以下命令设置上游
rabbitmqctl set_parameter federation-upstream origin '{"uri":"amqps://user:password@upstream-hostname:5671?cacertfile=<path-to-pem>"}' -p celery_vhost
使用以下命令应用策略
rabbitmqctl set_policy exchange-federation "^celery$" '{"federation-upstream-set":"all"}' --apply-to exchanges -p celery_vhost
终于让联盟开始工作了!
虽然没有找到确切的根本原因,但问题似乎与 celery 使用的交换类型有关。
将交流类型从直接更改为主题对我有用。
为了使用自定义队列,在我的 django 应用程序的 celery 设置中添加了以下行
celery_app.conf.task_queues = (Queue('celery', Exchange('celery', type='topic'), routing_key='celery'),)