我将 FastApi(0.78) 与 AioPika(版本 9.0.5)一起用于使用 rabbit 消费和发布消息的服务。 我的应用程序在带有 aws rabbit broker 的 aws 结构中的 k8s 上运行。 Rabbit broker 在集群中工作,有 3 个实例用于负载平衡消息。每周或每两周 aws 都会对代理进行维护,并且这样做就像关闭 1 个实例更新它并再次运行到集群。 在使用旧版本应用程序关闭最后一个代理实例后,它试图重新连接到代理,但它只连接到代理的新实例,而不是连接到消费者队列。
我实现从兔子消费的代码如下:
与消费者一起启动循环任务:
main.py
@app.on_event('startup')
async def startup() -> None:
"""Execute messages consumption on the application startup."""
loop = asyncio.get_event_loop()
task = loop.create_task(consume(loop))
await task
连接兔子消费的方法
tasks.py
async def consume(loop: BaseEventLoop) -> Connection:
"""Consume messages from RabbitMQ.
Args:
loop: Event loop.
Returns:
Aio-pika connection.
"""
logger.info("Starting messages consumption...")
conn: Connection = await connect_robust(settings.rabbitmq_url, loop=loop)
channel = await conn.channel()
exchange = await channel.get_exchange(settings.rabbitmq_exchange)
queue_bridge = await channel.declare_queue('queue_name')
await queue_bridge.bind(exchange, 'queue_name')
await queue_bridge.consume(on_message)
return conn
async def on_message(message: IncomingMessage) -> None:
"""Process incoming message with report.
Args:
message: Message received from the RabbitMQ.
"""
async with message.process(ignore_processed=True):
message_body = json.loads(message.body.decode('utf-8'))
logger.info("Received message on %s", message.routing_key, extra={
'body': message_body,
})
if message.routing_key == 'routing.key.name':
Do something with message payload
根据 aio-pike 文档,我应该使用
connect_robust
,它在失去连接后重新连接所有内容(连接、交换、队列)并且应用程序正在这样做:
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
应用程序成功连接到新的代理实例,但没有连接到交换器和队列(没有消费者)。
看了aio-pika的很多issue,说重连有问题,现在解决了。我不知道我做错了什么。
希望大家能帮忙