如何使用async / await异步在node.js中创建ReadStream

问题描述 投票:3回答:2

我在使用fs.creadReadStream异步处理csv文件时遇到困难:

async function processData(row) {
    // perform some asynchronous function
    await someAsynchronousFunction();
}

fs.createReadStream('/file')
    .pipe(parse({
        delimiter: ',',
        columns: true
    })).on('data', async (row) => {
        await processData(row);
    }).on('end', () => {
        console.log('done processing!')
    })

我想在createReadStream到达on('end')之前一一读取每个记录后执行一些异步功能。

但是,在我所有数据完成处理之前,on('end')被命中。有人知道我可能做错了吗?

提前感谢!

node.js stream async-await
2个回答
3
投票

.on('data, ...)不等待您的await。请记住,async函数会立即返回一个承诺,而.on()并未对该承诺进行任何关注,因此它会保持愉快的状态。

[await仅在函数内部等待,它不会阻止函数立即返回,因此流认为您已处理数据并继续发送更多数据并生成更多data事件。

这里有几种可能的方法,但是最简单的方法可能是暂停流,直到完成processData(),然后重新启动流。

同样,processData()是否返回与异步操作完成相关的promise?这也是await能够完成其工作所必需的。

readable stream doc包含一个示例,该示例在data事件期间暂停流,然后在某些异步操作完成后恢复该流。这是他们的例子:

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});

3
投票

最近我遇到了同样的问题。我通过使用承诺数组来修复它,并在触发.on("end")时等待它们全部解决。

import parse from "csv-parse";

export const parseCsv = () =>
  new Promise((resolve, reject) => {
    const promises = [];
    fs.createReadStream('/file')
      .pipe(parse({ delimiter: ',', columns: true }))
      .on("data", row => promises.push(processData(row)))
      .on("error", reject)
      .on("end", async () => {
        await Promise.all(promises);
        resolve();
      });
  });
© www.soinside.com 2019 - 2024. All rights reserved.