假设我有以下代码:
try {
let size = 0;
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
for await (const chunk of source) {
size += chunk.length;
if (size >= 1000000) {
throw new Error('File is too big');
}
yield String(chunk).toUpperCase();
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
} catch (error) {
console.log('got error:', error);
}
如何确定在每种情况下我都正确关闭了流? node docs并没有太大帮助-他们只是告诉我,我将有悬挂的事件监听器:
stream.pipeline()将在所有流上调用stream.destroy(err),除了:
已发出“结束”或“关闭”的可读流。
已发出“完成”或“关闭”的可写流。
stream.pipeline()在调用回调后将悬垂的事件侦听器留在流上。如果发生故障后重用流,这可能会导致事件侦听器泄漏和误吞的错误。
因此,我发现许多node.js流复合操作(例如pipeline()
和.pipe()
)在错误处理方面确实很糟糕/不完整。例如,如果您只是这样做:
fs.createReadStream("input.txt")
.pipe(fs.createWriteStream("output.txt"))
.on('error', err => {
console.log(err);
}).on('finish', () => {
console.log("all done");
});
[您希望打开readStream时出现错误,您会在此处的错误处理程序中看到该错误,但“不”不是这样。打开输入文件时将发生错误。
您必须单独保存readStream对象,并直接将错误处理程序附加到该对象,以查看该错误。因此,我只是不再相信这种复合材料,而该文档也从不真正解释如何进行正确的错误处理。我尝试查看pipeline()
的代码,以了解我是否能够理解错误处理,但这并没有证明是卓有成效的。]
所以,您的特定问题似乎可以通过转换流来完成:
const fs = require('fs'); const { Transform } = require('stream'); const myTransform = new Transform({ transform: function(chunk, encoding, callback) { let str = chunk.toString('utf8'); this.push(str.toUpperCase()); callback(); } }); function upperFile(input, output) { return new Promise((resolve, reject) => { // common function for cleaning up a partial output file function errCleanup(err) { fs.unlink(output, function(e) { if (e) console.log(e); reject(err); }); } let inputStream = fs.createReadStream(input, {encoding: 'utf8'}); let outputStream = fs.createWriteStream(output, {emitClose: true}); // have to separately listen for read/open errors inputStream.on("error", err => { // have to manually close writeStream when there was an error reading if (outputStream) outputStream.destroy(); errCleanup(err); }); inputStream.pipe(myTransform) .pipe(outputStream) .on("error", errCleanup) .on("close", resolve); }); } // sample usage upperFile("input.txt", "output.txt").then(() => { console.log("all done"); }).catch(err => { console.log("got error", err); });
如您所见,大约有2/3的代码以强大的方式处理错误。