FastApi Aio-Pika 重置后重新连接到 rabbit Broker 的问题

问题描述 投票:0回答:0

我将 FastApi(0.78) 与 AioPika(版本 9.0.5)一起用于使用 rabbit 消费和发布消息的服务。 我的应用程序在带有 aws rabbit broker 的 aws 结构中的 k8s 上运行。 Rabbit broker 在集群中工作,有 3 个实例用于负载平衡消息。每周或每两周 aws 都会对代理进行维护,并且这样做就像关闭 1 个实例更新它并再次运行到集群。 在使用旧版本应用程序关闭最后一个代理实例后,它试图重新连接到代理,但它只连接到代理的新实例,而不是连接到消费者队列。

我实现从兔子消费的代码如下:

与消费者一起启动循环任务:

main.py

@app.on_event('startup')
async def startup() -> None:
    """Execute messages consumption on the application startup."""
    loop = asyncio.get_event_loop()
    task = loop.create_task(consume(loop))

    await task

连接兔子消费的方法

tasks.py


async def consume(loop: BaseEventLoop) -> Connection:
    """Consume messages from RabbitMQ.

    Args:
        loop: Event loop.

    Returns:
        Aio-pika connection.
    """
    logger.info("Starting messages consumption...")

    conn: Connection = await connect_robust(settings.rabbitmq_url, loop=loop)
    channel = await conn.channel()
    exchange = await channel.get_exchange(settings.rabbitmq_exchange)
    queue_bridge = await channel.declare_queue('queue_name')

    await queue_bridge.bind(exchange, 'queue_name')
    await queue_bridge.consume(on_message)

    return conn


async def on_message(message: IncomingMessage) -> None:
    """Process incoming message with report.

    Args:
        message: Message received from the RabbitMQ.
    """
    async with message.process(ignore_processed=True):
        message_body = json.loads(message.body.decode('utf-8'))

        logger.info("Received message on %s", message.routing_key, extra={
            'body': message_body,
        })

        if message.routing_key == 'routing.key.name':
            Do something with message payload

根据 aio-pike 文档,我应该使用

connect_robust
,它在失去连接后重新连接所有内容(连接、交换、队列)并且应用程序正在这样做:

Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Unexpected connection close from remote "amqps:SOMETHING", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.
Connection attempt to "amqps:SOMETHING" failed: [Errno 111] Connect call failed (SOME IP AND PORT). Reconnecting after 5 seconds.

应用程序成功连接到新的代理实例,但没有连接到交换器和队列(没有消费者)。

看了aio-pika的很多issue,说重连有问题,现在解决了。我不知道我做错了什么。

希望大家能帮忙

python-3.x rabbitmq fastapi
© www.soinside.com 2019 - 2024. All rights reserved.