如何访问配置为基于事件的扩展的 Azure 容器应用程序作业中的消息详细信息?

问题描述 投票:0回答:1

我已使用 Azure 服务总线队列中的消息配置了具有基于事件的扩展的 Azure 容器应用程序作业。当这些事件触发时,是否可以访问容器应用程序作业内的消息详细信息(例如消息正文、消息 ID、会话 ID、元数据)?如果是这样,我怎样才能实现这一目标?

azure azureservicebus azure-container-apps keda azure-container-app-jobs
1个回答
0
投票

是的,可以访问由 Azure 服务总线事件触发的 Azure 容器应用作业内的消息详细信息,例如消息正文消息 ID会话 ID元数据

以下是实现此目标的方法:

使用 Azure 服务总线 SDK 连接到服务总线队列并处理消息。这允许您在使用日志记录进行处理期间检索消息正文、消息 ID、会话 ID 和元数据等详细信息。

下面的示例代码用于处理启用会话的队列中的消息并记录消息 ID、会话 ID、正文和应用程序属性。

import os
import asyncio
import logging
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.exceptions import SessionLockLostError

logging.basicConfig(
    level=logging.INFO,  # Use INFO or DEBUG level for more verbosity
    format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
    handlers=[
        logging.StreamHandler(),  # Logs to stdout
        logging.FileHandler("application.log")  # Logs to a file
    ]
)
logger = logging.getLogger("ContainerAppsLogger")
SERVICE_BUS_CONNECTION_STRING = "AzureServiceConnectionString"
QUEUE_NAME = "QUEUENAME"

async def process_message(message):
    """Process a single message."""
    try:
        if hasattr(message.body, "__iter__") and not isinstance(message.body, (str, bytes)):
            body = b"".join(message.body)
        else:
            body = message.body
        logger.info(f"Processing message ID: {message.message_id}")
        logger.info(f"Session ID: {message.session_id}")
        logger.info(f"Body: {body.decode('utf-8') if isinstance(body, bytes) else body}")
        logger.info(f"Application Properties: {message.application_properties}")

    except Exception as e:
        logger.error(f"Error processing message: {e}")

async def receive_messages():
    """Receive and process messages from the session-enabled queue."""
    logger.info("Starting message receiver...")
    async with ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING) as client:
        while True:
            try:
                receiver = client.get_queue_receiver(queue_name=QUEUE_NAME, session_id="1")
                async with receiver:
                    session = receiver.session
                    logger.info(f"Session started with ID: {session.session_id}")
                    logger.info(f"Session State: {await session.get_state()}")

                    async for message in receiver:
                        await process_message(message)
                        await receiver.complete_message(message)
                        logger.info("Message processed successfully.")
                        await session.renew_lock()  # Renew the session lock frequently
            except SessionLockLostError as e:
                logger.warning("Session lock lost, re-establishing the session.")
                await asyncio.sleep(1)  # Add a short delay before retrying
            except Exception as e:
                logger.error(f"Unexpected error: {e}")
                await asyncio.sleep(1)

if __name__ == "__main__":
    logger.info("Application starting...")
    try:
        asyncio.run(receive_messages())
    except KeyboardInterrupt:
        logger.info("Application shutting down...")
    except Exception as e:
        logger.critical(f"Critical error: {e}")

请参阅此文档,了解如何在 Azure 服务总线中启用会话以及使用会话 ID 发送和接收消息。

构建图像:

docker build -t <registry-name>.azurecr.io/myapp1:v2 .

docker build -t .azurecr.io/myapp1:v2

使用以下命令登录 Azure 容器注册表 (ACR),然后推送映像:

az acr login --name <ACR_NAME>

az acr login --name <ACR_NAME>

使用

az containerapp job
命令创建容器应用程序作业,或通过 Azure 门户使用相应的 docker 映像来完成此操作。

Azure portal with Container App Job

启动作业并向服务总线队列发送消息。

end a message to the Service Bus Queue

您应该能够在“执行历史记录/日志”部分中看到容器应用程序作业的日志。

 Execution History

要查询 Azure Log Analytics 以获取包含消息正文、消息 ID、会话 ID 和元数据的日志,您可以从

ContainerAppConsoleLogs_CL
表/日志源中筛选和投影相关信息。这是查询:

ContainerAppConsoleLogs_CL
| where ContainerGroupName_s startswith "ravi56778554teja-"
| where Log_s contains "Message ID" or Log_s contains "Session ID" or Log_s contains "Body" or Log_s contains "Application Properties"
| project TimeGenerated, Log_s
| order by TimeGenerated asc

ContainerAppConsoleLogs_CL

请参阅此链接以使用 Azure 容器应用程序部署事件驱动作业。

有关更多详细信息,请参阅此链接了解 Azure 容器应用程序中的应用程序日志记录。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.