AWS Lambda 触发器不执行 AWS Step 函数

问题描述 投票:0回答:1

我编写了 lambda 函数,该函数从 SQS 读取消息、解码并启动 Step 函数。 当我在 AWS Lambda 中手动运行它时,一切都正确执行,因此消息被读取、解码并执行步骤函数。但是,当我将 lambda 触发器添加到 SQS 时,它根本不执行我的步骤函数。我可以在 AWS Lambda 的 Monitor 中看到我的函数被正确调用,但它没有执行步骤函数。为什么会这样以及如何解决?

import logging
import boto3
import base64
import json
import os
import ast

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def get_queue_url(queue_name):
    """
    Get the URL of the queue dynamically based on its name.
    """
    sqs = boto3.client('sqs')
    response = sqs.get_queue_url(QueueName=queue_name)
    return response['QueueUrl']


def decode_celery_message(body):
    """
    Decode and parse the Celery message.
    """
    decoded_body = base64.b64decode(body).decode('utf-8')
    try:
        parsed_body = json.loads(decoded_body)
        if 'kwargsrepr' in parsed_body['headers']:
            kwargsrepr = parsed_body['headers']['kwargsrepr']
            task_id = parsed_body['headers']['id']
            kwargsrepr_dict = ast.literal_eval(kwargsrepr)
            kwargsrepr_dict['task_id'] = task_id
            return kwargsrepr_dict
        else:
            logger.error("'kwargsrepr' not found in the message headers.")
            return None
    except json.JSONDecodeError as e:
        logger.error("Error decoding JSON: %s", e)
        return None


def start_step_function_execution(parsed_message):
    """
    Start the execution of the Step Function.
    """
    stepfunctions_client = boto3.client('stepfunctions')
    logger.info(f"Starting Step Function execution with input: {parsed_message}")
    stepfunctions_client.start_execution(
        stateMachineArn=os.environ['STEP_FUNCTION_ARN'],
        input=json.dumps(parsed_message) 
    )
    logger.info("Step Function execution started successfully.")
    return parsed_message


def display_message_available(event, context):
    queue_name = os.environ["QUEUE_NAME"]  # Queue name
    queue_url = get_queue_url(queue_name)
    sqs = boto3.client('sqs')

    logger.info(f"Event details: {event}")
    
    all_messages = []  # List to store all decoded messages with commands
    
    while True:
        # Receive messages from the queue for processing
        response = sqs.receive_message(
            QueueUrl=queue_url,
        )
        
        # Check if any messages were received
        if 'Messages' in response:
            for message in response['Messages']:
                body = message['Body']
                receipt_handle = message['ReceiptHandle']
                
                # Decode and parse the message, get the command
                parsed_message = decode_celery_message(body)
                if parsed_message:
                    all_messages.append((parsed_message))
                    
                
                # Delete the message from the queue after processing
                sqs.delete_message(
                    QueueUrl=queue_url,
                    ReceiptHandle=receipt_handle
                )
                logger.info(f'Message {body} deleted from the {queue_name} after processing.')
                # Start Step Function execution
                start_step_function_execution(parsed_message)
                logger.info(f"Step Function execution started with input: {parsed_message}")
        else:
            logger.info("No more messages available in the queue.")
            break  # Exit the loop if no more messages are available
        

    return all_messages


# This is necessary for Lambda to know what function to invoke
lambda_handler = display_message_available

我尝试分离我的函数来开始执行,效果很好,但前提是我手动运行 lambda。我已检查 IAM 权限,并已授予 AdministratorAccess 以确保我拥有适当的权限。以下是我的 IAM 角色的屏幕截图。

我想了解为什么在启用 Lamda 触发器的情况下不执行 Step 函数以及如何解决我的问题?

在此输入图片描述

python amazon-web-services aws-lambda aws-step-functions
1个回答
0
投票

如果我明白你想要做什么,我认为EventBridge Pipes是最好的解决方案。这提供了一种完全托管的方式来从 SQS 队列(以及其他源)触发 Step Functions 状态机。

您可以在这里找到一个简单的示例

您有一些用于转换 SQS 消息的选项(您在decode_celery_message 方法中包含的内容)。您也许可以仅使用 Step Functions 目标的输入转换。您也可以使用管道的可选丰富功能Lambda函数中(或在API网关中托管的单独状态机或API中或通过API目标)进行转换。我建议您首先从输入转换开始,但正确的选择将取决于您的具体需求。

© www.soinside.com 2019 - 2024. All rights reserved.