我想在SQS收到新消息时接收并触发电子邮件,现在我已经运行了nodejs服务器,我怎样才能使它工作?我真的不想触发该功能。但我希望当 SQS 中有新消息时,该消费者将消费并执行发送电子邮件的业务逻辑。
但我的功能没有得到任何触发。注意:我没有调用这个函数,我希望它在 SQS 有新消息时自动触发。
const AWS = require('aws-sdk');
const mongoose = require('mongoose');
//
// Configure the aws details
//
AWS.config.update({
region: process.env['AWS_REGION'],
accessKeyId: process.env['AWS_ACCESS_KEY_ID'],
secretAccessKey: process.env['AWS_SECRET_ACCESS_KEY']
});
const sqs = new AWS.SQS({apiVersion: '2012-11-05'});
var queueURL = "https://sqs.us-east-1.amazonaws.com/xxxxx/demo-lambda-to-email-sqs"
var params = {
AttributeNames: [
"SentTimestamp"
],
MaxNumberOfMessages: 1,
MessageAttributeNames: [
"All"
],
QueueUrl: queueURL,
VisibilityTimeout: 20,
WaitTimeSeconds: 0
};
sqs.receiveMessage(params, function(err, data) {
if (err) {
console.log("Receive Error", err);
} else if (data.Messages) {
console.log('--------------------------- MESSAGE RECEIVED -------------')
var deleteParams = {
QueueUrl: queueURL,
ReceiptHandle: data.Messages[0].ReceiptHandle
};
sqs.deleteMessage(deleteParams, function(err, data) {
if (err) {
console.log("Delete Error", err);
} else {
console.log("Message Deleted", data);
}
});
}
});
SQS 是一种排队服务,因此需要通过基于拉的机制而不是基于推的机制来使用它。
仅当您具有轮询 SQS 队列的功能时才能调用此函数,然后在消息传入时触发该函数。
如果您不想维护使用者脚本,您应该考虑将此脚本迁移到 Lambda 函数中。使用此选项时,Lambda 服务将充当队列的使用者,并仅在添加消息时触发 Lambda 函数。
文档中提供了有关将 AWS Lambda 与 SQS 队列结合使用的更多信息。
您只需调用一次而不使用长轮询。所以它开始并得到一个空响应。因此,您需要一种基于拉取的机制,基本实现可以在 SetInterval 内运行 receiveMessage。比如:
setInterval(function() {
sqs.receiveMessage(params, function(err, data){
// your logic here!
});
}, 10000);
// Run every 10s
WaitTimeSeconds 大于 1 秒可实现长轮询,并通过消除空响应数量来帮助降低使用 Amazon SQS 的成本。
以下代码每 30 秒调用 SQS 请求消息。每次通话最多等待 20 秒才能收到消息。
const AWS = require('aws-sdk')
AWS.config.update({
region: 'us-east-1',
accessKeyId: '...',
secretAccessKey: '...'
})
const sqs = new AWS.SQS()
receiveMessage = () => sqs.receiveMessage({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/.../...',
WaitTimeSeconds: 20
}, (error, data) => {
if (error) console.error("ERROR:", error)
if (data.Messages) data.Messages.forEach(m => console.info(m.Body))
})
setInterval(receiveMessage, 30000)
这就是我使用 CloudFormation 做到的:
Resources:
ApplicationStreamListener:
Type: AWS::Serverless::Function
Properties:
Description: !Sub ${Environment}
CodeUri: src/lambdas
Handler: application-table-saved-event.default
Policies:
- AWSLambda_FullAccess
- AmazonDynamoDBFullAccess
Metadata:
BuildMethod: esbuild
BuildProperties:
Minify: true
Target: "es2020"
Sourcemap: true
EntryPoints:
- application-table-saved-event.ts
External:
- "@aws-sdk/*"
ApplicationTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: pk
AttributeType: S
- AttributeName: sk
AttributeType: S
KeySchema:
- AttributeName: pk
KeyType: HASH
- AttributeName: sk
KeyType: RANGE
BillingMode: PAY_PER_REQUEST
StreamSpecification:
StreamViewType: NEW_IMAGE #active streams for this table
ApplicationTableStream:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 1 #trigger one lambda per document
Enabled: True
FilterCriteria:
Filters:
- Pattern: '{"eventName": ["INSERT", "MODIFY"], "dynamodb": { "NewImage": { "paymentTime": { "S": [ { "exists": true } ] }}}}'
EventSourceArn: #trigger event from the data table
Fn::GetAtt: [ApplicationTable, StreamArn]
FunctionName: !Ref ApplicationStreamListener
StartingPosition: LATEST #always start at the tail of the stream