我有一个非常简单的 lambda 函数(nodeJS),它将接收到的事件放入 kinesis 流中。这是源代码:
'use strict';
const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis({apiVersion: '2013-12-02'});
exports.handler = async (event, context, callback) => {
let body = JSON.parse(event.body);
let receptionDate = new Date().toISOString();
let partitionKey = "pKey-" + Math.floor(Math.random() * 10);
// Response format needed for API Gateway
const formatResponse = (status, responseBody) => {
return {
statusCode: status,
headers: { "Content-Type": "application/json" },
body: JSON.stringify(responseBody)
}
}
// body.events is an array of events. Just add the reception date in each events.
for(let e of body.events) {
e.reception_date = receptionDate;
}
console.log("put In kinesis stream");
let kinesisParams = {
Data: new Buffer(JSON.stringify(body) + "\n"),
PartitionKey: partitionKey,
StreamName: 'event_test'
};
kinesis.putRecord(kinesisParams, (err, res) => {
console.log("Kinesis.putRecord DONE");
if(err) {
console.log("putRecord Error:", JSON.stringify(err));
callback(null, formatResponse(500, "Internal Error: " + JSON.stringify(err)));
} else {
console.log("putRecord Success:", JSON.stringify(res));
callback(null, formatResponse(200));
}
});
};
执行此代码时,cloudwatch 中的日志如下:
START RequestId: 5d4d7526-1a40-401f-8417-06435f0e5408 Version: $LATEST
2019-01-11T09:39:11.925Z 5d4d7526-1a40-401f-8417-06435f0e5408 put In kinesis stream
END RequestId: 5d4d7526-1a40-401f-8417-06435f0e5408
REPORT RequestId: 5d4d7526-1a40-401f-8417-06435f0e5408 Duration: 519.65 ms Billed Duration: 600 ms Memory Size: 128 MB Max Memory Used: 28 MB
似乎 kinesis.putRecord 没有被调用...我在 kinesis 流日志中没有看到任何内容。我肯定有哪里错了,但我不知道哪里错了!
kinesis.putRecord
是一个异步操作,它在完成时(无论成功还是有错误)调用回调(第二个参数)。
async
函数是一个返回promise的函数。当这个 Promise 得到解决时,Lambda 将完成其执行,即使还有其他异步操作尚未完成。
由于您的函数没有返回任何内容,因此当函数结束时,承诺将立即得到解决,因此执行将立即完成 - 无需等待您的异步 kinesis.putRecord
任务。
使用
async
处理程序时,不需要调用回调。相反,您可以返回您想要的任何内容,或者抛出错误。 Lambda 会获取并分别响应。
所以你这里有 2 个选择:
await
,因此只需删除 async
即可。在这种情况下,Lambda 正在等待事件循环为空(除非您显式更改 context.callbackWaitsForEmptyEventLoop)kinesis.putRecord
更改为:let result;
try {
result = await kinesis.putRecord(kinesisParams).promise();
} catch (err) {
console.log("putRecord Error:", JSON.stringify(err));
throw Error(formatResponse(500, "Internal Error: " + JSON.stringify(err));
}
console.log("putRecord Success:", JSON.stringify(result));
return formatResponse(200);
在第二个选项中,lambda 将继续运行,直到
kinesis.putRecord
完成。
有关本例中 Lambda 行为的更多信息,您可以在 lambda 容器中的
/var/runtime/node_modules/awslambda/index.js
下查看执行处理程序的主要代码。
@ttulka 你能解释一下吗?提供建议或代码示例吗? – 阿达吉奥
这是关于 JavaScript 中异步处理的演变。
首先,一切都是通过回调完成的,这是最古老的方法。在任何地方使用回调都会导致“回调地狱”(http://callbackhell.com)。
然后引入了Promises。使用 Promise 看起来有点像使用 Monad,所有内容都被打包到一个“盒子”(Promise)中,因此您必须链接所有调用:
thisCallReturnsPromise(...)
.then(data => ...)
.then(data => ...)
.then(data => ...)
.catch(err => ...)
这对人类来说有点不自然,因此 ECMAScript 2017 提出了异步函数中的语法糖(async/await)https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function
Async/await 语法允许您像使用普通同步代码一样使用异步 Promise:
const data = await thisCallReturnsPromise(...)
不要忘记,
await
调用必须位于异步函数内:
async () => {
const data = await thisCallReturnsPromise(...)
return await processDataAsynchronouslyInPromise(data)
}
AWS Lambda 支持 Node.js v8.10,它完全实现了此语法。
我也遇到了这个问题,原来是因为
.map()
map()
可以是异步的,但如果你想等待所有结果,你需要将你的map()函数包装在Promise.All()
之类的中
const results = await Promise.All(yourArray.map(async () => {})
刚刚找到解决方案:删除“async”关键字使其工作!
exports.handler = (event, context, callback) => { ... }