我目前正在致力于 Azure 服务总线队列和自定义网关服务的集成。每次服务收到事件时,都应该将消息发送到专用队列。 我已成功通过建立
ServiceBusClient
和 ServiceBusSender
实例来配置异步方法。
但是,根据Azure SDK for Python的官方文档,AMQP操作之间有10分钟的空闲限制。 就我而言,事件之间的间隔可能超过 10 分钟,因此当前连接确实已断开。当事件最终到来时,它会从“冷启动”重新创建。
有一个
keep-alive
参数,但它只显示10分钟时间范围内的调用频率。
只要事件之间的延迟超过 10 分钟,我就不想花时间重新创建。
如何避免这种情况?重置 IDLE 超时最安全的方法是什么?
我尝试访问 AMQP 连接对象,该对象在幕后使用 - 但它并不总是存在。还有一些 LINK_CREDIT 值,仅当发生任何合法操作时才会更新。但合法的操作都是向队列发送消息,这是我不想做的。
队列中的虚拟消息也不吸引我,因为它浪费资源,并且队列将由其他服务处理。
理想状态 - 一些 PING 或运行状况检查调用,只是为了保持连接处于活动状态并刷新 IDLE 超时。
消息系统中的Keep-Alive机制可确保连接保持活动状态,防止由于不活动而超时和断开连接。您可以这样实现:
KEEP_ALIVE_INTERVAL = 300
time.sleep(KEEP_ALIVE_INTERVAL)
keep_alive_thread = threading.Thread(target=keep_service_bus_alive, args=(servicebus_client, KEEP_ALIVE_INTERVAL))
为了确保连接保持活动状态,您需要比 10 分钟空闲超时更频繁地“ping”服务总线。安全间隔约为 5 分钟(300 秒) 甚至更短。
生存时间 (TTL) 是一个消息属性,指定消息应在队列或主题中保留多长时间,然后才会被视为过期并有资格进行删除。您可以像这样使用它:
TIME_TO_LIVE = datetime.timedelta(seconds=30)
def send_data_message(sender):
data_body = b'Hello from data message!'
application_properties = {"body_type": "data"}
data_message = ServiceBusMessage(
body=data_body,
application_properties=application_properties,
time_to_live=TIME_TO_LIVE
)
sender.send_messages(data_message)
下面的 Python 代码是如何通过 Azure 服务总线使用保持活动机制和生存时间属性。
logging.basicConfig(level=logging.DEBUG)
CONNECTION_STRING = "AzureServiceBusConnectionString"
queue_name = "QueueName"
KEEP_ALIVE_INTERVAL = 300
def keep_service_bus_alive(servicebus_client, interval):
while True:
try:
logging.info("Pinging Service Bus to keep connection alive...")
with servicebus_client.get_queue_sender(queue_name) as sender:
logging.info("Connection kept alive")
time.sleep(interval)
except Exception as e:
logging.error(f"Error keeping Service Bus alive: {e}")
def main():
servicebus_client = ServiceBusClient.from_connection_string(CONNECTION_STRING)
queue_sender = servicebus_client.get_queue_sender(queue_name)
queue_receiver = servicebus_client.get_queue_receiver(queue_name)
import threading
keep_alive_thread = threading.Thread(target=keep_service_bus_alive, args=(servicebus_client, KEEP_ALIVE_INTERVAL))
keep_alive_thread.daemon = True
keep_alive_thread.start()
logging.info("Sending two messages with TTL of seconds")
with queue_sender:
for i in range(2):
message = ServiceBusMessage(
f'Message {i+1}',
time_to_live=timedelta(seconds=30)
)
queue_sender.send_messages(message)
time.sleep(1)
logging.info("Receiving messages")
received_count = 0
with queue_receiver:
messages = queue_receiver.receive_messages(max_message_count=2, max_wait_time=5)
for msg in messages:
logging.info(f"Received message: {msg}")
queue_receiver.complete_message(msg)
received_count += 1
assert received_count == 2, f"Expected 2 messages, but got {received_count}"
logging.info("Test completed successfully")
if __name__ == "__main__":
main()
输出: