我正在阅读存储在AWS S3上的csv文件。这些csv记录由一个名为filterLogic()的函数评估。未通过测试的记录也需要写入AWS S3的错误报告csv文件中。对于csv解析,我使用的是fast-csv。
但是,对于最后一条记录,我得到了"Error [ERR_STREAM_WRITE_AFTER_END]: write after end"
我需要在哪里以及如何正确调用csvwritestream.end()
?
const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const s3 = new AWS.S3();
exports.handler = async (event) => {
console.log("Incoming Event: ", JSON.stringify(event));
const bucket = event.Records[0].s3.bucket.name;
const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const message = `File is uploaded in - ${bucket} -> ${filename}`;
console.log(message);
const splittedFilename = filename.split('.');
const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
const reportBucket = 'external.transactions.reports';
const csvwritestream = csv.format({ headers: true });
csvwritestream
.pipe(utils.uploadFromStream(s3, reportBucket, reportFilename))
.on('end', function () {
console.log("Report written to S3 " + reportFilename);
});
var request = s3.getObject({ Bucket: bucket, Key: filename });
var stream = request.createReadStream({ objectMode: true })
.pipe(csv.parse({ headers: true }))
.on('data', async function (data) {
stream.pause();
console.log("JSON: " + JSON.stringify(data));
var response = await utils.filterLogic(data);
if (response.statusCode !== 200) {
await csvwritestream.write(data);
console.log("Data: " + JSON.stringify(data) + " written.");
}
stream.resume();
})
.on('end', function(){
csvwritestream.end();
});
return new Promise(resolve => {
stream.on('close', async function () {
csvwritestream.end();
resolve();
});
});
};
您的错误原因并非如此简单。主要的问题是,在data
中未等待侦听器node.js event emitter-因此,您可能希望按顺序开始数据处理,但没有完成该时间。现在考虑到这一点,可以看到在最后一个end
事件之后立即触发了data
事件,因此在关闭写流之后完成了一些最后处理的项目的写入...是的,我知道确实暂停了,但是在async function
中,您是在节点处理了同步内容并且end
是..中的一个之后,这样做了。
[您可以做的是实现一个Transform流,但是,鉴于您的用例可能比仅使用另一个模块要复杂得多-例如我的scramjet
,它将允许您在数据过滤器中运行异步代码。
scramjet
Scramjet将通过上述方法来创建流管道,因此您无需暂停/继续-已经解决了所有这些。
您可能要阅读:
const AWS = require('aws-sdk');
const utils = require('./utils');
const {StringStream} = require('scramjet');
const s3 = new AWS.S3();
exports.handler = async (event) => {
console.log("Incoming Event: ", JSON.stringify(event));
const bucket = event.Records[0].s3.bucket.name;
const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const message = `File is uploaded in - ${bucket} -> ${filename}`;
console.log(message);
const splittedFilename = filename.split('.');
const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
const reportBucket = 'external.transactions.reports';
var request = s3.getObject({ Bucket: bucket, Key: filename });
var stream = StringStream
// create a StringStream from a scramjet stream
.from(
request.createReadStream()
)
// then just parse the data
.CSVParse({headers: true})
// then filter using asynchronous function as if it was an array
.filter(async data => {
var response = await utils.filterLogic(data);
return response.statusCode === 200;
})
// then stringify
.CSVStringify({headers: true})
// then upload
.pipe(
utils.uploadFromStream(s3, reportBucket, reportFilename)
);
return new Promise(
(res, rej) => stream
.on("finish", res)
.on("error", rej)
);
};
哦,超燃冲压发动机仅增加了3个单位,因此您的节点模块不会出现这种相对论的笑话。 ;)