我有一个执行 QLDB 事务的 Lambda 函数。由于 QLDB 使用 OCC(乐观并发控制),因此需要在前面添加 SQS Fifo,以确保按顺序处理潜在的冲突事务(在我的例子中:附加到特定用户的事务)。
使用SQS
SendMessage.waitForTaskToken
可以解决这个问题,但是需要使用多个工作流程。
是否可以添加到SQS并等待SQS发出刚刚添加的消息?我试图避免拆分此步骤功能。
根据我的知识,您可以创建一个解决方案,将消息发送到 SQS FIFO 队列并使用 Lambda 函数按顺序处理它们,而不是拆分步骤函数。 Lambda 函数可以确保一次处理附加到特定用户的交易。
让我们看看一些可以解决此问题的方法:
#SendMessage to SQS FIFO:
import boto3
import json
sqs = boto3.client('sqs')
queue_url = 'https://sqs.<region>.amazonaws.com/<account-id>/<queue-name>.fifo'
def send_message(user_id, transaction_data):
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(transaction_data),
MessageGroupId=str(user_id),
MessageDeduplicationId=str(user_id) + "-" + str(transaction_data['transaction_id'])
)
return response
#Lambda Function to Process SQS Messages:
import boto3
import json
from qldb_helpers import process_transaction # Your QLDB transaction logic
def lambda_handler(event, context):
for record in event['Records']:
message_body = json.loads(record['body'])
user_id = record['messageAttributes']['MessageGroupId']['stringValue']
# Process the QLDB transaction
process_transaction(user_id, message_body)
# Delete the message after successful processing
sqs = boto3.client('sqs')
sqs.delete_message(
QueueUrl=record['eventSourceARN'],
ReceiptHandle=record['receiptHandle']
)