如何在Node.js的管道流中发生错误事件后恢复?

问题描述 投票:4回答:2

我在MyWritableStream中发出错误事件后,数据传输停止。我需要做什么来恢复数据传输?

var readable = fs.createReadStream('test.txt');
var writable = new MyWritableStream();

writable.on('error', function(error) {
    console.log('error', error);
    // How i can resume?
});

writable.on('finish', function(){
    console.log('finished');
})

readable.pipe(writable);
node.js stream
2个回答
1
投票

我不确定,这是否是正常的做法,但我暂时看不到其他解决方案,它对我有用。如果您可以提出更准确的解决方案,请执行此操作。

我们可以使用可写的pipe事件跟踪可读流实例:

function WriteableStream(options) {
    Writable.call(this, options);

    this.source = null;

    var instance = this;

    this.on('pipe', function(source){
        instance.source = source;
    });
}
util.inherits(WriteableStream, Writable);

因此,当我们发出错误事件,并且可读流自动取消管道传输时,我们可以自行对其进行管道传输:

WriteableStream.prototype._write = function(chunk, encoding, done) {
    this.emit('error', new Error('test')); // unpipes readable
    done();
};

WriteableStream.prototype.resume = function() {
    this.source.pipe(this); // re-pipes readable
}

最后,我们将通过以下方式使用它:

var readable = fs.createReadStream(file);
var writeable = new WriteableStream();

writeable.on('error', function(error) {
    console.log('error', error);
    writeable.resume();
});

readable.pipe(writeable);

0
投票

我知道这个问题很旧,但是您可能想看看https://github.com/miraclx/xresilient

出于同样的原因,我构建了这个(对于可搜索的流效果最佳。)>

您定义了一个返回可读流的函数,该库将测量直到遇到错误为止所经过的字节数。

一旦可读流遇到error事件,它将使用读取的字节数调用已定义的函数,以便您可以为流源建立索引。

示例:

const fs = require('fs');
const xresilient = require('xresilient');

const readable = xresilient(({bytesRead}) => {
  return generateSeekableStreamSomehow({start: bytesRead});
}, {retries: 5});

const writable = fs.createWriteStream('file.test');

readable.pipe(writable);
  • 文件流可通过start功能的fs.createReadStream()选项建立索引。
  • HTTP请求可通过Range HTTP标头建立索引。
  • 检查出来。Range

© www.soinside.com 2019 - 2024. All rights reserved.