Kinesis Stream 上 PutRecord 的 Lambda 函数 ETIMEDOUT 错误

问题描述 投票:0回答:1

我正在用 JS 编写一个非常简单的 lambda。其主要目的是从 SQS 获取消息(我从 AWS 控制台手动触发),然后通过 put 记录发布到 kinesis 流。我已向 lambda 授予 PutRecord 所需的权限。但大多数时候我都会收到以下错误。

  "errorType": "Runtime.UnhandledPromiseRejection",
  "errorMessage": "Error [ERR_HTTP2_STREAM_CANCEL]: The pending stream has been canceled (caused by: connect ETIMEDOUT 34.223.45.15:443)",
  "trace": [
    "Runtime.UnhandledPromiseRejection: Error [ERR_HTTP2_STREAM_CANCEL]: The pending stream has been canceled (caused by: connect ETIMEDOUT 34.223.45.15:443)",
    "    at process.<anonymous> (file:///var/runtime/index.mjs:1276:17)",
    "    at process.emit (node:events:517:28)",
    "    at emit (node:internal/process/promises:149:20)",
    "    at processPromiseRejections (node:internal/process/promises:283:27)",
    "    at process.processTicksAndRejections (node:internal/process/task_queues:96:32)"
  ]
}

然而令人惊讶的是,有时它也会起作用,特别是当我通过将partitionKey更改为不同的随机值来部署时。

import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";

export const KINESIS_CLIENT = new KinesisClient([
  {
    httpOptions: {
      connectTimeout: 10000,
    },
    maxRetries: 1,
    region: 'us-west-2',
  },
]);

export const handler = async (event) => {

  const promises = [];

  for (const {messageId, body} of event.Records) {
    processEvent(body)
    promises.push(processEvent(body));
  }

  const responses = await Promise.allSettled(promises);
  responses.forEach((response) => {
    if (response.status !== "fulfilled") {
      throw Error(JSON.stringify(responses));
    }
  });
};

export async function processEvent(body) {
  const newBody = JSON.parse(body);
  newBody['field'] = 'Random';

  await KINESIS_CLIENT.send(
      new PutRecordCommand({
        Data: new TextEncoder().encode(JSON.stringify(newBody)),
        StreamName: 'InputEventStream',
        PartitionKey: '3', <--- I change this and sometimes after redeployment it seems to work
      }),
  );
}```

javascript amazon-web-services aws-lambda amazon-kinesis
1个回答
0
投票

虽然您已将 connectTimeout 设置为 10 秒,但您可能需要显式设置 socketTimeout(用于读/写操作)和 maxAttempts(用于重试)。

导出 const KINESIS_CLIENT = new KinesisClient({ 区域:“us-west-2”, maxAttempts: 3, // 允许更多次重试 requestHandler: 新的 NodeHttpHandler({ connectionTimeout: 20000, // 20秒连接超时 socketTimeout: 20000 // 20秒套接字超时 }) });

改进的错误处理

promises.push(processEvent(body).catch((err) => { console.error("处理记录时出错:", err); 返回错误; // 捕获错误而不是立即失败 }));

© www.soinside.com 2019 - 2024. All rights reserved.