我在使用fs.creadReadStream
异步处理csv文件时遇到困难:
async function processData(row) {
// perform some asynchronous function
await someAsynchronousFunction();
}
fs.createReadStream('/file')
.pipe(parse({
delimiter: ',',
columns: true
})).on('data', async (row) => {
await processData(row);
}).on('end', () => {
console.log('done processing!')
})
我想在createReadStream
到达on('end')
之前一一读取每个记录后执行一些异步功能。
但是,在我所有数据完成处理之前,on('end')
被命中。有人知道我可能做错了吗?
提前感谢!
.on('data, ...)
不等待您的await
。请记住,async
函数会立即返回一个承诺,而.on()
并未对该承诺进行任何关注,因此它会保持愉快的状态。
[await
仅在函数内部等待,它不会阻止函数立即返回,因此流认为您已处理数据并继续发送更多数据并生成更多data
事件。
这里有几种可能的方法,但是最简单的方法可能是暂停流,直到完成processData()
,然后重新启动流。
同样,processData()
是否返回与异步操作完成相关的promise?这也是await
能够完成其工作所必需的。
readable stream doc包含一个示例,该示例在data
事件期间暂停流,然后在某些异步操作完成后恢复该流。这是他们的例子:
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
最近我遇到了同样的问题。我通过使用承诺数组来修复它,并在触发.on("end")
时等待它们全部解决。
import parse from "csv-parse";
export const parseCsv = () =>
new Promise((resolve, reject) => {
const promises = [];
fs.createReadStream('/file')
.pipe(parse({ delimiter: ',', columns: true }))
.on("data", row => promises.push(processData(row)))
.on("error", reject)
.on("end", async () => {
await Promise.all(promises);
resolve();
});
});