我将 FastApi(0.78) 与 AioPika(版本 9.0.5)一起用于使用 rabbit 消费和发布消息的服务。 我的应用程序在带有 aws rabbit broker 的 aws 结构中的 k8s 上运行。 Rabbit broker 在集群中工作,有 3 个实例用于负载平衡消息。每周或每两周 aws 都会对代理进行维护,并且这样做就像关闭 1 个实例更新它并再次运行到集群。 在使用旧版本应用程序关闭最后一个代理实例后,它试图重新连接到代理,但它只连接到代理的新实例,而不是连接到消费者队列。
async def startup() -> None:
"""Execute messages consumption on the application startup."""
loop = asyncio.get_event_loop()
task = loop.create_task(consume(loop))
await task
async def consume(loop: BaseEventLoop) -> Connection:
"""Consume messages from RabbitMQ.
loop: Event loop.
Aio-pika connection.
logger.info("Starting messages consumption...")
conn: Connection = await connect_robust(settings.rabbitmq_url, loop=loop)
channel = await conn.channel()
exchange = await channel.get_exchange(settings.rabbitmq_exchange)
queue_bridge = await channel.declare_queue('queue_name')
await queue_bridge.bind(exchange, 'queue_name')
await queue_bridge.consume(on_message)
return conn
async def on_message(message: IncomingMessage) -> None:
"""Process incoming message with report.
message: Message received from the RabbitMQ.
async with message.process(ignore_processed=True):
message_body = json.loads(message.body.decode('utf-8'))
logger.info("Received message on %s", message.routing_key, extra={
'body': message_body,
if message.routing_key == 'routing.key.name':
Do something with message payload
根据 aio-pike 文档,我应该使用
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.