我是RabbitMQ和Pika的新手,但我认为我已经清楚地了解了它的工作原理。
我需要实现这一点:
生产者创建消息并通过扇出交换发送,多个生产者(测试环境中为2个)收到相同的消息。
但是只有一位消费者同时收到消息
2019-11-29 19:02:44.167549 b'Hello'-第一消费者
2019-11-29 19:02:45.068192 b'Hello'-第二个消费者
制作人:
async def main(loop):
connection = await connect_robust(
"amqp://guest:[email protected]/", loop=loop
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange('test_exchange',
ExchangeType.FANOUT, auto_delete=True
)
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes('Hello', 'utf-8'),
content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
消费者:
async def main(loop):
connection = await aio_pika.connect_robust(host='192.168.1.3', login='guest', password='guest', loop=loop
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(datetime.datetime.now(), message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
首先,我假设您正在运行两个单独的使用者进程。
每个使用者应将其自己的队列绑定到扇出交换机。不要使用共享队列。一种解决方案是让每个使用者使用互斥队列。
只要您的消费者先启动,生产者就无需创建队列并将其绑定到扇出交换机。
首先尝试。然后,如果您需要考虑生产者可以首先启动,则它必须创建两个具有众所周知名称的队列并将其绑定。消费者开始时应该做同样的事情。
[NOTE: RabbitMQ团队监视rabbitmq-users
mailing list,并且有时仅在StackOverflow上回答问题。