Node.js在读取流的末尾关闭写入流

问题描述 投票:0回答:1

我正在阅读存储在AWS S3上的csv文件。这些csv记录由一个名为filterLogic()的函数评估。未通过测试的记录也需要写入AWS S3的错误报告csv文件中。对于csv解析,我使用的是fast-csv。

但是,对于最后一条记录,我得到了"Error [ERR_STREAM_WRITE_AFTER_END]: write after end"

我需要在哪里以及如何正确调用csvwritestream.end()

const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ", JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';

    const csvwritestream = csv.format({ headers: true });
    csvwritestream
        .pipe(utils.uploadFromStream(s3, reportBucket, reportFilename))
        .on('end', function () {
            console.log("Report written to S3 " + reportFilename);
        });

    var request = s3.getObject({ Bucket: bucket, Key: filename });
    var stream = request.createReadStream({ objectMode: true })
        .pipe(csv.parse({ headers: true }))
        .on('data', async function (data) {
            stream.pause();
            console.log("JSON: " + JSON.stringify(data));
            var response = await utils.filterLogic(data);
            if (response.statusCode !== 200) {
                await csvwritestream.write(data);
                console.log("Data: " + JSON.stringify(data) + " written."); 
            }
            stream.resume();
        })
        .on('end', function(){
            csvwritestream.end();   
        });
    return new Promise(resolve => {
        stream.on('close', async function () {
            csvwritestream.end();
            resolve();
        });
    });
};
node.js amazon-s3 async-await stream es6-promise
1个回答
0
投票

您的错误原因并非如此简单。主要的问题是,在data中未等待侦听器node.js event emitter-因此,您可能希望按顺序开始数据处理,但没有完成该时间。现在考虑到这一点,可以看到在最后一个end事件之后立即触发了data事件,因此在关闭写流之后完成了一些最后处理的项目的写入...是的,我知道确实暂停了,但是在async function中,您是在节点处理了同步内容并且end是..中的一个之后,这样做了。

[您可以做的是实现一个Transform流,但是,鉴于您的用例可能比仅使用另一个模块要复杂得多-例如我的scramjet,它将允许您在数据过滤器中运行异步代码。

scramjet

Scramjet将通过上述方法来创建流管道,因此您无需暂停/继续-已经解决了所有这些。

您可能要阅读:

  • const AWS = require('aws-sdk'); const utils = require('./utils'); const {StringStream} = require('scramjet'); const s3 = new AWS.S3(); exports.handler = async (event) => { console.log("Incoming Event: ", JSON.stringify(event)); const bucket = event.Records[0].s3.bucket.name; const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' ')); const message = `File is uploaded in - ${bucket} -> ${filename}`; console.log(message); const splittedFilename = filename.split('.'); const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1]; const reportBucket = 'external.transactions.reports'; var request = s3.getObject({ Bucket: bucket, Key: filename }); var stream = StringStream // create a StringStream from a scramjet stream .from( request.createReadStream() ) // then just parse the data .CSVParse({headers: true}) // then filter using asynchronous function as if it was an array .filter(async data => { var response = await utils.filterLogic(data); return response.statusCode === 200; }) // then stringify .CSVStringify({headers: true}) // then upload .pipe( utils.uploadFromStream(s3, reportBucket, reportFilename) ); return new Promise( (res, rej) => stream .on("finish", res) .on("error", rej) ); };
  • DataStream..filter
  • 哦,超燃冲压发动机仅增加了3个单位,因此您的节点模块不会出现这种相对论的笑话。 ;)

© www.soinside.com 2019 - 2024. All rights reserved.