Azure 服务总线消息自动续订不起作用

问题描述 投票:0回答:1

我有一个 python 服务,它使用 azure 服务总线主题来发送和接收消息。我正在使用 azure-servicebus==7.12.2.

我可以在 Azure 门户上设置的最大超时为 5 分钟,但处理主题消息大约需要 60 分钟。因此,我按照网上几篇帖子的建议使用 AutoLockRenewer。

这是我的代码

async def receive_messages_from_topic(
    servicebus_client: ServiceBusClient,
    topic_name: str,
    executor: Any,  # type: ignore
):
    async with servicebus_client:
        renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
        async with servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=SUBSCRIPTION_NAME,
            auto_lock_renewer=renewer,  # type: ignore
        ) as receiver:
            logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
            while True:
                try:
                    messages = await receiver.receive_messages()
                    for message in messages:
                        # Process the message
                        logger.info(
                            f"Received message from topic: {topic_name}, msg: {str(message)}"
                        )

                        with make_temp_directory() as temp_dir:
                            try:
                                bytes_str = str(message)
                                bytes = ast.literal_eval(bytes_str)
                                executor(topic_name, bytes, temp_dir)
                                await receiver.complete_message(message)
                            except Exception as e:
                                logger.exception(
                                    f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}"
                                )
                                await receiver.dead_letter_message(message)

                    if len(messages) == 0:
                        # Small sleep to avoid busy looping
                        await asyncio.sleep(60)
                except Exception as e:
                    logger.exception(f"Error processing message for topic: {topic_name}, {e}")

executor(topic_name, bytes, temp_dir) 将在 40 分钟内完成处理,没有问题或错误。但是,当我尝试在上面的循环中将消息标记为完成时,它将抛出异常

Traceback (most recent call last):
  File "/home/vscode/.cache/bazel/_bazel_vscode/d9fd8dd9485b3d1d1994c3846baefa35/execroot/_main/bazel-out/k8-opt-release/bin/tools/service_bus/run/run.runfiles/_main/tools/service_bus/run/run.py", line 72, in receive_messages_from_topic
    await receiver.complete_message(message)
  File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 852, in complete_message
    await self._settle_message_with_retry(message, MESSAGE_COMPLETE)
  File "/home/vscode/.local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 494, in _settle_message_with_retry
    raise MessageLockLostError(
azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired.

我尝试在消息和会话上设置 AutoLockRenewer,结果仍然相同。 https://azuresdkdocs.blob.core.windows.net/$web/python/azure-servicebus/latest/index.html#automatically-renew-message-or-session-locks

不确定我是否遗漏了某些内容或我的实现有问题

python-3.x azure azureservicebus azure-servicebus-queues azure-servicebus-topics
1个回答
0
投票

我尝试使用下面的代码来处理来自 Azure 服务总线主题的消息,并在 5 分钟后完成它们。我正在使用

AutoLockRenewer
在处理消息期间自动将消息锁定延长最多 60 分钟。

代码:

import asyncio
import logging
from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer
from contextlib import contextmanager
import tempfile
import shutil
import os
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

SUBSCRIPTION_NAME = "<sub_name>"
TOPIC_NAME = "<topic_name>"
CONNECTION_STRING = "<Connec_Str>"

@contextmanager
def make_temp_directory():
    temp_dir = tempfile.mkdtemp()
    try:
        yield temp_dir
    finally:
        shutil.rmtree(temp_dir)

def executor(topic_name, message_data, temp_dir):
    logger.info(f"Processing message from topic: {topic_name}, data: {message_data}, temp dir: {temp_dir}")
    with open(os.path.join(temp_dir, 'message.txt'), 'w') as f:
        f.write(str(message_data))

async def receive_messages_from_topic(servicebus_client: ServiceBusClient, topic_name: str, executor: any, duration_seconds: int = 300):
    async with servicebus_client:
        renewer = AutoLockRenewer(max_lock_renewal_duration=3600)
        async with servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=SUBSCRIPTION_NAME,
            auto_lock_renewer=renewer,
        ) as receiver:
            logger.info(f"Listening to topic: {topic_name} for subscription: {SUBSCRIPTION_NAME}")
            
            start_time = time.time()
            end_time = start_time + duration_seconds
            
            while time.time() < end_time:
                try:
                    messages = await receiver.receive_messages()
                    if not messages:
                        await asyncio.sleep(10) 
                        continue
                    
                    for message in messages:
                        message_content = str(message)
                        logger.info(f"Received message from topic: {topic_name}, msg: {message_content}")
                        
                        with make_temp_directory() as temp_dir:
                            try:
                                executor(topic_name, message_content, temp_dir)
                                await receiver.complete_message(message)
                            except Exception as e:
                                logger.exception(f"Error executing job for topic: {topic_name} message: {message.message_id}, {e}")
                                await receiver.dead_letter_message(message)
                except asyncio.CancelledError:
                    logger.info("Receive operation was cancelled.")
                    break
                except Exception as e:
                    logger.exception(f"Error processing message for topic: {topic_name}, {e}")

async def main():
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING)
    await receive_messages_from_topic(servicebus_client, TOPIC_NAME, executor)

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logger.info("Process interrupted by user.")

输出:

以下代码运行成功如下所示。

enter image description here enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.