我如何在现有的nodejs服务器上使用sqs消息

问题描述 投票:0回答:4

我想在SQS收到新消息时接收并触发电子邮件,现在我已经运行了nodejs服务器,我怎样才能使它工作?我真的不想触发该功能。但我希望当 SQS 中有新消息时,该消费者将消费并执行发送电子邮件的业务逻辑。

但我的功能没有得到任何触发。注意:我没有调用这个函数,我希望它在 SQS 有新消息时自动触发。

const AWS = require('aws-sdk');
const mongoose = require('mongoose');

//
// Configure the aws details
//
AWS.config.update({
    region: process.env['AWS_REGION'],
    accessKeyId: process.env['AWS_ACCESS_KEY_ID'],
    secretAccessKey: process.env['AWS_SECRET_ACCESS_KEY']
  });


const sqs = new AWS.SQS({apiVersion: '2012-11-05'});

var queueURL = "https://sqs.us-east-1.amazonaws.com/xxxxx/demo-lambda-to-email-sqs"



var params = {
    AttributeNames: [
        "SentTimestamp"
     ],
    MaxNumberOfMessages: 1,
    MessageAttributeNames: [
       "All"
    ],
    QueueUrl: queueURL,
    VisibilityTimeout: 20,
    WaitTimeSeconds: 0
   };

   sqs.receiveMessage(params, function(err, data) {
    if (err) {
      console.log("Receive Error", err);
    } else if (data.Messages) {
      console.log('--------------------------- MESSAGE RECEIVED -------------')
      var deleteParams = {
        QueueUrl: queueURL,
        ReceiptHandle: data.Messages[0].ReceiptHandle
      };
      sqs.deleteMessage(deleteParams, function(err, data) {
        if (err) {
          console.log("Delete Error", err);
        } else {
          console.log("Message Deleted", data);
        }
      });
    }
  });
javascript node.js amazon-web-services amazon-sqs
4个回答
2
投票

SQS 是一种排队服务,因此需要通过基于拉的机制而不是基于推的机制来使用它。

仅当您具有轮询 SQS 队列的功能时才能调用此函数,然后在消息传入时触发该函数。

如果您不想维护使用者脚本,您应该考虑将此脚本迁移到 Lambda 函数中。使用此选项时,Lambda 服务将充当队列的使用者,并仅在添加消息时触发 Lambda 函数。

文档中提供了有关将 AWS Lambda 与 SQS 队列结合使用的更多信息。


1
投票

您只需调用一次而不使用长轮询。所以它开始并得到一个空响应。因此,您需要一种基于拉取的机制,基本实现可以在 SetInterval 内运行 receiveMessage。比如:

setInterval(function() {
    sqs.receiveMessage(params, function(err, data){
      // your logic here!
    });
}, 10000);
// Run every 10s

WaitTimeSeconds 大于 1 秒可实现长轮询,并通过消除空响应数量来帮助降低使用 Amazon SQS 的成本。


0
投票

以下代码每 30 秒调用 SQS 请求消息。每次通话最多等待 20 秒才能收到消息。

const AWS = require('aws-sdk')
AWS.config.update({
    region: 'us-east-1',
    accessKeyId: '...',
    secretAccessKey: '...'
})
const sqs = new AWS.SQS()

receiveMessage = () => sqs.receiveMessage({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/.../...',
    WaitTimeSeconds: 20
}, (error, data) => {
    if (error) console.error("ERROR:", error)
    if (data.Messages) data.Messages.forEach(m => console.info(m.Body))
})

setInterval(receiveMessage, 30000)

0
投票

这就是我使用 CloudFormation 做到的:

Resources:

  ApplicationStreamListener:
    Type: AWS::Serverless::Function
    Properties:
      Description: !Sub ${Environment}
      CodeUri: src/lambdas
      Handler: application-table-saved-event.default
      Policies:
        - AWSLambda_FullAccess
        - AmazonDynamoDBFullAccess
    Metadata:
      BuildMethod: esbuild
      BuildProperties:
        Minify: true
        Target: "es2020"
        Sourcemap: true
        EntryPoints:
          - application-table-saved-event.ts
        External:
          - "@aws-sdk/*"

  ApplicationTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: pk
          AttributeType: S
        - AttributeName: sk
          AttributeType: S
      KeySchema:
        - AttributeName: pk
          KeyType: HASH
        - AttributeName: sk
          KeyType: RANGE
      BillingMode: PAY_PER_REQUEST
      StreamSpecification:
        StreamViewType: NEW_IMAGE #active streams for this table

  ApplicationTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1 #trigger one lambda per document
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"eventName": ["INSERT", "MODIFY"], "dynamodb": { "NewImage": { "paymentTime": { "S": [ { "exists": true } ] }}}}'
      EventSourceArn: #trigger event from the data table
        Fn::GetAtt: [ApplicationTable, StreamArn]
      FunctionName: !Ref ApplicationStreamListener
      StartingPosition: LATEST #always start at the tail of the stream
© www.soinside.com 2019 - 2024. All rights reserved.