我已使用 Azure 服务总线队列中的消息配置了具有基于事件的扩展的 Azure 容器应用程序作业。当这些事件触发时,是否可以访问容器应用程序作业内的消息详细信息(例如消息正文、消息 ID、会话 ID、元数据)?如果是这样,我怎样才能实现这一目标?
是的,可以访问由 Azure 服务总线事件触发的 Azure 容器应用作业内的消息详细信息,例如消息正文、消息 ID、会话 ID 和元数据。
以下是实现此目标的方法:
使用 Azure 服务总线 SDK 连接到服务总线队列并处理消息。这允许您在使用日志记录进行处理期间检索消息正文、消息 ID、会话 ID 和元数据等详细信息。
下面的示例代码用于处理启用会话的队列中的消息并记录消息 ID、会话 ID、正文和应用程序属性。
import os
import asyncio
import logging
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus.exceptions import SessionLockLostError
logging.basicConfig(
level=logging.INFO, # Use INFO or DEBUG level for more verbosity
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s',
handlers=[
logging.StreamHandler(), # Logs to stdout
logging.FileHandler("application.log") # Logs to a file
]
)
logger = logging.getLogger("ContainerAppsLogger")
SERVICE_BUS_CONNECTION_STRING = "AzureServiceConnectionString"
QUEUE_NAME = "QUEUENAME"
async def process_message(message):
"""Process a single message."""
try:
if hasattr(message.body, "__iter__") and not isinstance(message.body, (str, bytes)):
body = b"".join(message.body)
else:
body = message.body
logger.info(f"Processing message ID: {message.message_id}")
logger.info(f"Session ID: {message.session_id}")
logger.info(f"Body: {body.decode('utf-8') if isinstance(body, bytes) else body}")
logger.info(f"Application Properties: {message.application_properties}")
except Exception as e:
logger.error(f"Error processing message: {e}")
async def receive_messages():
"""Receive and process messages from the session-enabled queue."""
logger.info("Starting message receiver...")
async with ServiceBusClient.from_connection_string(SERVICE_BUS_CONNECTION_STRING) as client:
while True:
try:
receiver = client.get_queue_receiver(queue_name=QUEUE_NAME, session_id="1")
async with receiver:
session = receiver.session
logger.info(f"Session started with ID: {session.session_id}")
logger.info(f"Session State: {await session.get_state()}")
async for message in receiver:
await process_message(message)
await receiver.complete_message(message)
logger.info("Message processed successfully.")
await session.renew_lock() # Renew the session lock frequently
except SessionLockLostError as e:
logger.warning("Session lock lost, re-establishing the session.")
await asyncio.sleep(1) # Add a short delay before retrying
except Exception as e:
logger.error(f"Unexpected error: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
logger.info("Application starting...")
try:
asyncio.run(receive_messages())
except KeyboardInterrupt:
logger.info("Application shutting down...")
except Exception as e:
logger.critical(f"Critical error: {e}")
请参阅此文档,了解如何在 Azure 服务总线中启用会话以及使用会话 ID 发送和接收消息。
构建图像:
docker build -t <registry-name>.azurecr.io/myapp1:v2 .
使用以下命令登录 Azure 容器注册表 (ACR),然后推送映像:
az acr login --name <ACR_NAME>
使用
az containerapp job
命令创建容器应用程序作业,或通过 Azure 门户使用相应的 docker 映像来完成此操作。
启动作业并向服务总线队列发送消息。
您应该能够在“执行历史记录/日志”部分中看到容器应用程序作业的日志。
要查询 Azure Log Analytics 以获取包含消息正文、消息 ID、会话 ID 和元数据的日志,您可以从
ContainerAppConsoleLogs_CL
表/日志源中筛选和投影相关信息。这是查询:
ContainerAppConsoleLogs_CL
| where ContainerGroupName_s startswith "ravi56778554teja-"
| where Log_s contains "Message ID" or Log_s contains "Session ID" or Log_s contains "Body" or Log_s contains "Application Properties"
| project TimeGenerated, Log_s
| order by TimeGenerated asc
请参阅此链接以使用 Azure 容器应用程序部署事件驱动作业。
有关更多详细信息,请参阅此链接了解 Azure 容器应用程序中的应用程序日志记录。