在同一工作流程中发送和接收消息

问题描述 投票:0回答:1

我有一个执行 QLDB 事务的 Lambda 函数。由于 QLDB 使用 OCC(乐观并发控制),因此需要在前面添加 SQS Fifo,以确保按顺序处理潜在的冲突事务(在我的例子中:附加到特定用户的事务)。

使用SQS

SendMessage.waitForTaskToken
可以解决这个问题,但是需要使用多个工作流程。

是否可以添加到SQS并等待SQS发出刚刚添加的消息?我试图避免拆分此步骤功能。

amazon-web-services aws-lambda amazon-sqs aws-step-functions
1个回答
0
投票

根据我的知识,您可以创建一个解决方案,将消息发送到 SQS FIFO 队列并使用 Lambda 函数按顺序处理它们,而不是拆分步骤函数。 Lambda 函数可以确保一次处理附加到特定用户的交易。

让我们看看一些可以解决此问题的方法:

  1. 创建SQS FIFO队列
  2. 发送消息到SQS FIFO队列
  3. 触发Lamda功能
  4. 顺序处理消息。

#SendMessage to SQS FIFO:

import boto3
import json

sqs = boto3.client('sqs')
queue_url = 'https://sqs.<region>.amazonaws.com/<account-id>/<queue-name>.fifo'

def send_message(user_id, transaction_data):
    response = sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(transaction_data),
        MessageGroupId=str(user_id),
        MessageDeduplicationId=str(user_id) + "-" + str(transaction_data['transaction_id'])
    )
    return response


#Lambda Function to Process SQS Messages:
import boto3
import json
from qldb_helpers import process_transaction  # Your QLDB transaction logic

def lambda_handler(event, context):
    for record in event['Records']:
        message_body = json.loads(record['body'])
        user_id = record['messageAttributes']['MessageGroupId']['stringValue']
        
        # Process the QLDB transaction
        process_transaction(user_id, message_body)
        
        # Delete the message after successful processing
        sqs = boto3.client('sqs')
        sqs.delete_message(
            QueueUrl=record['eventSourceARN'],
            ReceiptHandle=record['receiptHandle']
        )

© www.soinside.com 2019 - 2024. All rights reserved.