节点中的createWriteStream似乎在“数据”之前执行了“结束”事件

问题描述 投票:0回答:1

我无法理解事件循环如何处理我的代码段。我想要实现的是

  • 从csv读取
  • 下载在csv中找到的资源
  • 将其上传到s3
  • 将其写入新的csv文件
const readAndUpload =  () => {
    fs.createReadStream('filename.csv')
    .pipe(csv())
    .on('data', ((row: any) => {
        const file = fs.createWriteStream("file.jpg");
        var url = new URL(row.imageURL)
        // choose whether to make an http or https request
        let client = (url.protocol=="https:") ? https : http
        const request = client.get(row.imageURL, function(response:any) {
            // file save
            response.pipe(file);
            console.log('file saved')
            let filePath = "file.jpg";
            let params = {
                Bucket: 'bucket-name',
                Body : fs.createReadStream(filePath),
                Key : "filename.jpg"
            };
            // upload to s3
            s3.upload(params, function (err: any, data: any) {
                //handle error
                if (err) {
                  console.log("Error", err);
                }

                //success
                if (data) {
                  console.log("Uploaded in:", data.Location);
                  row.imageURL = data.Location
                  writeData.push(row)
                //   console.log(writeData)
                }
              });
        });
    }))
    .on('end', () => {
        console.log("done reading")
        const csvWriter = createCsvWriter({
            path: 'out.csv',
            header: [
                {id: 'id', title: 'some title'}
            ]
        });
        csvWriter
            .writeRecords(writeData)
            .then(() => console.log("The CSV file was written successfully"))
    })
}

按照我添加的日志语句,done readingThe CSV file was written successfullyfile saved之前打印。我的理解是end事件是在data事件之后调用的,所以我不确定我要去哪里。

谢谢您的阅读!

node.js callback pipe event-loop
1个回答
0
投票

我不确定这是否是问题的一部分,但是在这部分代码中,您还有另外一组parens。更改此:

.on('data', ((row: any) => {
     .....
})).on('end', () => {

至此:

.on('data', (row: any) => {
     .....
}).on('end', () => {

并且,如果正确设置了事件处理程序,则对于同一流,将在.on('data', ...)之前调用.on('end', ....)事件处理程序。如果你这样说:

console.log('at start of data event handler');

作为该事件处理程序的第一行,您将看到它首先被调用。

但是,您的数据事件处理程序使用多个异步调用,您的代码中没有任何内容使end事件等待在data事件处理程序中完成所有处理。因此,由于该处理需要一段时间,因此很自然会在完成对end事件的所有异步代码之前,会发生data事件。


此外,如果您可以有多个data事件(通常会发生这种情况),那么由于要使用固定的文件名,因此您将同时运行多个data事件,它们可能会互相覆盖。


解决此类问题的常用方法是在stream.pause()事件处理开始时先data暂停读取流,然后在完成所有异步工作后,再按stream.resume()让它开始再去一次。

您需要获得正确的流才能暂停和继续。您可以执行以下操作:

let stream = fs.createReadStream('filename.csv')
 .pipe(csv());

stream.on('data', ((row: any) => {
   stream.pause();
   ....
});

然后,在s3.upload()回调中,您可以调用stream.resume()。您还将需要更好得多的错误处理,否则,如果遇到错误,事情只会被卡住。

看来您在呼叫时也遇到了其他并发问题:

response.pipe(file);

然后您尝试使用file,而没有实际等待该.pipe()操作完成(这也是异步的)。总体而言,这整个逻辑确实需要进行重大清理。我不明白您在所有不同步骤中到底想做什么,以了解如何编写完全干净和简单的版本。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.