我有一个 python 服务,它使用 azure 服务总线主题来发送和接收消息。我正在使用 azure-servicebus==7.12.2.
我可以在 Azure 门户上设置的最大超时为 5 分钟,但处理主题消息大约需要 60 分钟。因此,我按照网上几篇帖子的建议使用 AutoLockRenewer。
这是我的代码
async def receive_messages_from_topic(
servicebus_client: ServiceBusClient,
topic_name: str,
executor: Any, # type: ignore
):
async with servicebus_client:
renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
async with servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=SUBSCRIPTION_NAME,
auto_lock_renewer=renewer, # type: ignore
) as receiver:
logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
while True:
try:
messages = await receiver.receive_messages()
for message in messages:
# Process the message
logger.info(
f"Received message from topic: {topic_name}, msg: {str(message)}"
)
with make_temp_directory() as temp_dir:
try:
bytes_str = str(message)
bytes = ast.literal_eval(bytes_str)
executor(topic_name, bytes, temp_dir)
await receiver.complete_message(message)
except Exception as e:
logger.exception(
f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}"
)
await receiver.dead_letter_message(message)
if len(messages) == 0:
# Small sleep to avoid busy looping
await asyncio.sleep(60)
except Exception as e:
logger.exception(f"Error processing message for topic: {topic_name}, {e}")
executor(topic_name, bytes, temp_dir) 将在 40 分钟内完成处理,没有问题或错误。但是,当我尝试在上面的循环中将消息标记为完成时,它将抛出异常
Traceback (most recent call last):
File "/home/vscode/.cache/bazel/_bazel_vscode/d9fd8dd9485b3d1d1994c3846baefa35/execroot/_main/bazel-out/k8-opt-release/bin/tools/service_bus/run/run.runfiles/_main/tools/service_bus/run/run.py", line 72, in receive_messages_from_topic
await receiver.complete_message(message)
File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 852, in complete_message
await self._settle_message_with_retry(message, MESSAGE_COMPLETE)
File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 494, in _settle_message_with_retry
raise MessageLockLostError(
azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired.
我尝试在消息和会话上设置 AutoLockRenewer,结果仍然相同。 https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/index.html#automatically-renew-message-or-session-locks
不确定我是否遗漏了某些内容或我的实现有问题
我尝试使用下面的代码来处理来自 Azure 服务总线主题的消息,并在 5 分钟后完成它们。我正在使用
AutoLockRenewer
在处理消息期间自动将消息锁定延长最多 60 分钟。
代码:
import asyncio
import logging
from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
from contextlib import contextmanager
import tempfile
import shutil
import os
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SUBSCRIPTION_NAME = "<sub_name>"
TOPIC_NAME = "<topic_name>"
CONNECTION_STRING = "<Connec_Str>"
@contextmanager
def make_temp_directory():
temp_dir = tempfile.mkdtemp()
try:
yield temp_dir
finally:
shutil.rmtree(temp_dir)
def executor(topic_name, message_data, temp_dir):
logger.info(f"Processing message from topic: {topic_name}, data: {message_data}, temp dir: {temp_dir}")
with open(os.path.join(temp_dir, 'message.txt'), 'w') as f:
f.write(str(message_data))
async def receive_messages_from_topic(servicebus_client: ServiceBusClient, topic_name: str, executor: any, duration_seconds: int = 300):
async with servicebus_client:
renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
async with servicebus_client.get_subscription_receiver(
topic_name=topic_name,
subscription_name=SUBSCRIPTION_NAME,
auto_lock_renewer=renewer,
) as receiver:
logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
start_time = time.time()
end_time = start_time + duration_seconds
while time.time() < end_time:
try:
messages = await receiver.receive_messages()
if not messages:
await asyncio.sleep(10)
continue
for message in messages:
message_content = str(message)
logger.info(f"Received message from topic: {topic_name}, msg: {message_content}")
with make_temp_directory() as temp_dir:
try:
executor(topic_name, message_content, temp_dir)
await receiver.complete_message(message)
except Exception as e:
logger.exception(f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}")
await receiver.dead_letter_message(message)
except asyncio.CancelledError:
logger.info("Receive operation was cancelled.")
break
except Exception as e:
logger.exception(f"Error processing message for topic: {topic_name}, {e}")
async def main():
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING)
await receive_messages_from_topic(servicebus_client, TOPIC_NAME, executor)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Process interrupted by user.")
输出:
以下代码运行成功如下所示。