我正在用 JS 编写一个非常简单的 lambda。其主要目的是从 SQS 获取消息(我从 AWS 控制台手动触发),然后通过 put 记录发布到 kinesis 流。我已向 lambda 授予 PutRecord 所需的权限。但大多数时候我都会收到以下错误。
"errorType": "Runtime.UnhandledPromiseRejection",
"errorMessage": "Error [ERR_HTTP2_STREAM_CANCEL]: The pending stream has been canceled (caused by: connect ETIMEDOUT 34.223.45.15:443)",
"trace": [
"Runtime.UnhandledPromiseRejection: Error [ERR_HTTP2_STREAM_CANCEL]: The pending stream has been canceled (caused by: connect ETIMEDOUT 34.223.45.15:443)",
" at process.<anonymous> (file:///var/runtime/index.mjs:1276:17)",
" at process.emit (node:events:517:28)",
" at emit (node:internal/process/promises:149:20)",
" at processPromiseRejections (node:internal/process/promises:283:27)",
" at process.processTicksAndRejections (node:internal/process/task_queues:96:32)"
]
}
然而令人惊讶的是,有时它也会起作用,特别是当我通过将partitionKey更改为不同的随机值来部署时。
import { KinesisClient, PutRecordCommand } from "@aws-sdk/client-kinesis";
export const KINESIS_CLIENT = new KinesisClient([
{
httpOptions: {
connectTimeout: 10000,
},
maxRetries: 1,
region: 'us-west-2',
},
]);
export const handler = async (event) => {
const promises = [];
for (const {messageId, body} of event.Records) {
processEvent(body)
promises.push(processEvent(body));
}
const responses = await Promise.allSettled(promises);
responses.forEach((response) => {
if (response.status !== "fulfilled") {
throw Error(JSON.stringify(responses));
}
});
};
export async function processEvent(body) {
const newBody = JSON.parse(body);
newBody['field'] = 'Random';
await KINESIS_CLIENT.send(
new PutRecordCommand({
Data: new TextEncoder().encode(JSON.stringify(newBody)),
StreamName: 'InputEventStream',
PartitionKey: '3', <--- I change this and sometimes after redeployment it seems to work
}),
);
}```
虽然您已将 connectTimeout 设置为 10 秒,但您可能需要显式设置 socketTimeout(用于读/写操作)和 maxAttempts(用于重试)。
导出 const KINESIS_CLIENT = new KinesisClient({ 区域:“us-west-2”, maxAttempts: 3, // 允许更多次重试 requestHandler: 新的 NodeHttpHandler({ connectionTimeout: 20000, // 20秒连接超时 socketTimeout: 20000 // 20秒套接字超时 }) });
改进的错误处理
promises.push(processEvent(body).catch((err) => { console.error("处理记录时出错:", err); 返回错误; // 捕获错误而不是立即失败 }));