我编写了 lambda 函数,该函数从 SQS 读取消息、解码并启动 Step 函数。 当我在 AWS Lambda 中手动运行它时,一切都正确执行,因此消息被读取、解码并执行步骤函数。但是,当我将 lambda 触发器添加到 SQS 时,它根本不执行我的步骤函数。我可以在 AWS Lambda 的 Monitor 中看到我的函数被正确调用,但它没有执行步骤函数。为什么会这样以及如何解决?
import logging
import boto3
import base64
import json
import os
import ast
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_queue_url(queue_name):
"""
Get the URL of the queue dynamically based on its name.
"""
sqs = boto3.client('sqs')
response = sqs.get_queue_url(QueueName=queue_name)
return response['QueueUrl']
def decode_celery_message(body):
"""
Decode and parse the Celery message.
"""
decoded_body = base64.b64decode(body).decode('utf-8')
try:
parsed_body = json.loads(decoded_body)
if 'kwargsrepr' in parsed_body['headers']:
kwargsrepr = parsed_body['headers']['kwargsrepr']
task_id = parsed_body['headers']['id']
kwargsrepr_dict = ast.literal_eval(kwargsrepr)
kwargsrepr_dict['task_id'] = task_id
return kwargsrepr_dict
else:
logger.error("'kwargsrepr' not found in the message headers.")
return None
except json.JSONDecodeError as e:
logger.error("Error decoding JSON: %s", e)
return None
def start_step_function_execution(parsed_message):
"""
Start the execution of the Step Function.
"""
stepfunctions_client = boto3.client('stepfunctions')
logger.info(f"Starting Step Function execution with input: {parsed_message}")
stepfunctions_client.start_execution(
stateMachineArn=os.environ['STEP_FUNCTION_ARN'],
input=json.dumps(parsed_message)
)
logger.info("Step Function execution started successfully.")
return parsed_message
def display_message_available(event, context):
queue_name = os.environ["QUEUE_NAME"] # Queue name
queue_url = get_queue_url(queue_name)
sqs = boto3.client('sqs')
logger.info(f"Event details: {event}")
all_messages = [] # List to store all decoded messages with commands
while True:
# Receive messages from the queue for processing
response = sqs.receive_message(
QueueUrl=queue_url,
)
# Check if any messages were received
if 'Messages' in response:
for message in response['Messages']:
body = message['Body']
receipt_handle = message['ReceiptHandle']
# Decode and parse the message, get the command
parsed_message = decode_celery_message(body)
if parsed_message:
all_messages.append((parsed_message))
# Delete the message from the queue after processing
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
logger.info(f'Message {body} deleted from the {queue_name} after processing.')
# Start Step Function execution
start_step_function_execution(parsed_message)
logger.info(f"Step Function execution started with input: {parsed_message}")
else:
logger.info("No more messages available in the queue.")
break # Exit the loop if no more messages are available
return all_messages
# This is necessary for Lambda to know what function to invoke
lambda_handler = display_message_available
我尝试分离我的函数来开始执行,效果很好,但前提是我手动运行 lambda。我已检查 IAM 权限,并已授予 AdministratorAccess 以确保我拥有适当的权限。以下是我的 IAM 角色的屏幕截图。
我想了解为什么在启用 Lamda 触发器的情况下不执行 Step 函数以及如何解决我的问题?
如果我明白你想要做什么,我认为EventBridge Pipes是最好的解决方案。这提供了一种完全托管的方式来从 SQS 队列(以及其他源)触发 Step Functions 状态机。
您可以在这里找到一个简单的示例。
您有一些用于转换 SQS 消息的选项(您在decode_celery_message 方法中包含的内容)。您也许可以仅使用 Step Functions 目标的输入转换。您也可以使用管道的可选丰富功能在Lambda函数中(或在API网关中托管的单独状态机或API中或通过API目标)进行转换。我建议您首先从输入转换开始,但正确的选择将取决于您的具体需求。