Node-管道后正确关闭流

问题描述 投票:0回答:1

假设我有以下代码:

try {
    let size = 0;

    await pipeline(
        fs.createReadStream('lowercase.txt'),
        async function* (source) {
            for await (const chunk of source) {
                size += chunk.length;

                if (size >= 1000000) {
                    throw new Error('File is too big');
                }

                yield String(chunk).toUpperCase();
            }
        },
        fs.createWriteStream('uppercase.txt')
    );

    console.log('Pipeline succeeded.');
} catch (error) {
    console.log('got error:', error);
}

如何确定在每种情况下我都正确关闭了流? node docs并没有太大帮助-他们只是告诉我,我将有悬挂的事件监听器:

stream.pipeline()将在所有流上调用stream.destroy(err),除了:

已发出“结束”或“关闭”的可读流。

已发出“完成”或“关闭”的可写流。

stream.pipeline()在调用回调后将悬垂的事件侦听器留在流上。如果发生故障后重用流,这可能会导致事件侦听器泄漏和误吞的错误。

javascript node.js stream pipe pipeline
1个回答
0
投票

因此,我发现许多node.js流复合操作(例如pipeline().pipe())在错误处理方面确实很糟糕/不完整。例如,如果您只是这样做:

fs.createReadStream("input.txt")
  .pipe(fs.createWriteStream("output.txt"))
  .on('error', err => {
      console.log(err);
  }).on('finish', () => {
      console.log("all done");
  });

[您希望打开readStream时出现错误,您会在此处的错误处理程序中看到该错误,但“不”不是这样。打开输入文件时将发生错误。

您必须单独保存readStream对象,并直接将错误处理程序附加到该对象,以查看该错误。因此,我只是不再相信这种复合材料,而该文档也从不真正解释如何进行正确的错误处理。我尝试查看pipeline()的代码,以了解我是否能够理解错误处理,但这并没有证明是卓有成效的。]

所以,您的特定问题似乎可以通过转换流来完成:

const fs = require('fs');
const { Transform } = require('stream');

const myTransform = new Transform({
    transform: function(chunk, encoding, callback) {
        let str = chunk.toString('utf8');
        this.push(str.toUpperCase());
        callback();
    }
});

function upperFile(input, output) {
    return new Promise((resolve, reject) => {
        // common function for cleaning up a partial output file
        function errCleanup(err) {
            fs.unlink(output, function(e) {
                if (e) console.log(e);
                reject(err);
            });
        }

        let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
        let outputStream = fs.createWriteStream(output, {emitClose: true});

        // have to separately listen for read/open errors
        inputStream.on("error", err => {
            // have to manually close writeStream when there was an error reading
            if (outputStream) outputStream.destroy();
            errCleanup(err);
        });
        inputStream.pipe(myTransform)
            .pipe(outputStream)
            .on("error", errCleanup)
            .on("close", resolve);        
    });
}

// sample usage
upperFile("input.txt", "output.txt").then(() => {
    console.log("all done");
}).catch(err => {
    console.log("got error", err);
});

如您所见,大约有2/3的代码以强大的方式处理错误。

© www.soinside.com 2019 - 2024. All rights reserved.