{
"Comment": "Run Redshift Queries",
"StartAt": "ReceiveMessage from SQS",
"States": {
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
}
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"End": true
}
},
"TimeoutSeconds": 3600
}
我只是进行了一个测试,看来这些消息暂时下降了,但随后又上升了。
是在“ SQS的收款人”阶段和红移阶段之间插入lambda的最佳方法?这提出了另一个问题。我只手动运行这个。如何最终激活此步骤功能以在任何消息上运行?
如果您必须使用SQ,则需要具有Lambda功能才能充当代理。您将需要将队列设置为lambda触发器,并且需要编写一个可以解析SQS消息并将适当调用的lambda拨打到步骤函数startExecutionapi.
sqs:deleteMessage
1
ResultPath
,并且and and与$
"ReceiveMessage from SQS": {
"Type": "Task",
"Parameters": {
"MaxNumberOfMessages": 1,
"QueueUrl": "******"
},
"Resource": "arn:aws:states:::aws-sdk:sqs:receiveMessage",
"Next": "Run Analysis Queries",
"ResultSelector": {
"body.$": "States.StringToJson($.Messages[0].Body)"
}
},
"Run Analysis Queries": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "******",
"Database": "prod",
"Sql": "select * from ******"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"ResultPath": "$.redshift_output",
"Next": "delete_sqs"
},
"delete_sqs": {
"Comment": "Deletes SQS message",
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:sqs:deleteMessage",
"Parameters": {
"ReceiptHandle.$": "$.Messages[0].ReceiptHandle",
"QueueUrl": "******"
},
"ResultPath": null,
"Next": "update_result"
}
此外,您可以一次读取多达10条消息
MaxNumberOfMessages
等于此示例中的地图步骤:
10
您还可以使用
eventbridgetPipes(一种多功能事件驱动的实用程序)来传递SQS-> sfn的事件。 AWSLABS团队活动具有
CDK构造(awsdocs
)。 ts这看起来像: