如何在 Python SDK 中使 Azure 服务总线客户端和发送方实例保持活动状态超过 10 分钟?

问题描述 投票:0回答:1

我目前正在致力于 Azure 服务总线队列和自定义网关服务的集成。每次服务收到事件时,都应该将消息发送到专用队列。 我已成功通过建立

ServiceBusClient
ServiceBusSender
实例来配置异步方法。

但是,根据Azure SDK for Python的官方文档,AMQP操作之间有10分钟的空闲限制。 就我而言,事件之间的间隔可能超过 10 分钟,因此当前连接确实已断开。当事件最终到来时,它会从“冷启动”重新创建。

有一个

keep-alive
参数,但它只显示10分钟时间范围内的调用频率。

只要事件之间的延迟超过 10 分钟,我就不想花时间重新创建。

如何避免这种情况?重置 IDLE 超时最安全的方法是什么?

我尝试访问 AMQP 连接对象,该对象在幕后使用 - 但它并不总是存在。还有一些 LINK_CREDIT 值,仅当发生任何合法操作时才会更新。但合法的操作都是向队列发送消息,这是我不想做的。

队列中的虚拟消息也不吸引我,因为它浪费资源,并且队列将由其他服务处理。

理想状态 - 一些 PING 或运行状况检查调用,只是为了保持连接处于活动状态并刷新 IDLE 超时。

azureservicebus amqp azure-servicebus-queues keep-alive azure-python-sdk
1个回答
0
投票

消息系统中的Keep-Alive机制可确保连接保持活动状态,防止由于不活动而超时和断开连接。您可以这样实现:

KEEP_ALIVE_INTERVAL = 300
 time.sleep(KEEP_ALIVE_INTERVAL)
     keep_alive_thread = threading.Thread(target=keep_service_bus_alive, args=(servicebus_client, KEEP_ALIVE_INTERVAL))

为了确保连接保持活动状态,您需要比 10 分钟空闲超时更频繁地“ping”服务总线。安全间隔约为 5 分钟(300 秒) 甚至更短。

生存时间 (TTL) 是一个消息属性,指定消息应在队列或主题中保留多长时间,然后才会被视为过期并有资格进行删除。您可以像这样使用它:

TIME_TO_LIVE = datetime.timedelta(seconds=30)

def send_data_message(sender):
    data_body = b'Hello from data message!'
    application_properties = {"body_type": "data"}
    data_message = ServiceBusMessage(
        body=data_body,
        application_properties=application_properties,
        time_to_live=TIME_TO_LIVE
    )
    sender.send_messages(data_message)

下面的 Python 代码是如何通过 Azure 服务总线使用保持活动机制和生存时间属性。

logging.basicConfig(level=logging.DEBUG)


CONNECTION_STRING = "AzureServiceBusConnectionString"

queue_name = "QueueName"

KEEP_ALIVE_INTERVAL = 300 

def keep_service_bus_alive(servicebus_client, interval):
    while True:
        try:
          
            logging.info("Pinging Service Bus to keep connection alive...")
            with servicebus_client.get_queue_sender(queue_name) as sender:
                logging.info("Connection kept alive")
            time.sleep(interval)
        except Exception as e:
            logging.error(f"Error keeping Service Bus alive: {e}")

def main():
 
    servicebus_client = ServiceBusClient.from_connection_string(CONNECTION_STRING)
    queue_sender = servicebus_client.get_queue_sender(queue_name)
    queue_receiver = servicebus_client.get_queue_receiver(queue_name)

    import threading
    keep_alive_thread = threading.Thread(target=keep_service_bus_alive, args=(servicebus_client, KEEP_ALIVE_INTERVAL))
    keep_alive_thread.daemon = True  
    keep_alive_thread.start()

    logging.info("Sending two messages with TTL of  seconds")
    with queue_sender:
        for i in range(2):
            message = ServiceBusMessage(
                f'Message {i+1}',  
                time_to_live=timedelta(seconds=30) 
            )
            queue_sender.send_messages(message)
            time.sleep(1)  

    logging.info("Receiving messages")
    received_count = 0
    with queue_receiver:
        messages = queue_receiver.receive_messages(max_message_count=2, max_wait_time=5)
        for msg in messages:
            logging.info(f"Received message: {msg}")
            queue_receiver.complete_message(msg)
            received_count += 1

    assert received_count == 2, f"Expected 2 messages, but got {received_count}"

    logging.info("Test completed successfully")

if __name__ == "__main__":
    main()


输出:

enter image description here

有关更多详细信息,请参阅 git 和此链接,了解有关 Keep-Alive 的信息。

© www.soinside.com 2019 - 2024. All rights reserved.