如何在Stream.js中使用stream.Writable的drain事件

问题描述 投票:11回答:3

在Node.js中,我使用fs.createWriteStream方法将数据附加到本地文件。在Node文档中,他们在使用drain时提到了fs.createWriteStream事件,但我不明白。

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);

在上面的代码中,我如何使用drain事件?事件在下面正确使用了吗?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}
javascript node.js stream
3个回答
23
投票

drain事件用于清空可写流的内部缓冲区。

只有当内部缓冲区的大小超过其highWaterMark属性时才会发生这种情况,var fs = require('fs'); var read = fs.createReadStream('./read'); var write = fs.createWriteStream('./write'); 属性是可以存储在可写流的内部缓冲区内的数据的最大字节数,直到它停止从数据源读取为止。

造成这种情况的原因可能是由于设置涉及从一个流中读取数据源的速度比可以写入另一个资源的速度快。例如,采取两个流:

read

现在假设文件write在SSD上,可以500MB / s读取,150MB/s在硬盘上,只能在highWaterMark上写入。写入流将无法跟上,并将开始将数据存储在内部缓冲区中。一旦缓冲区达到false(默认为16KB),写入将开始返回drain,并且流将在内部排队。一旦内部缓冲区的长度为0,则触发if (state.length === 0 && state.needDrain) { state.needDrain = false; stream.emit('drain'); } 事件。

这是排水工作的方式:

writeOrBuffer

这些是排水的先决条件,它是var ret = state.length < state.highWaterMark; state.needDrain = !ret; 功能的一部分:

drain

要了解如何使用function writeOneMillionTimes(writer, data, encoding, callback) { var i = 1000000; write(); function write() { var ok = true; do { i -= 1; if (i === 0) { // last time! writer.write(data, encoding, callback); } else { // see if we should continue, or wait // don't pass the callback, because we're not done yet. ok = writer.write(data, encoding); } } while (i > 0 && ok); if (i > 0) { // had to stop early! // write some more once it drains writer.once('drain', write); } } } 事件,请参阅Node.js文档中的示例。

ok

该函数的目标是将1,000,000次写入可写流。变量ok设置为true,循环仅在ok为真时执行。对于每个循环迭代,stream.write()的值设置为drain的值,如果需要ok,则返回false。如果drain变为false,那么drain的事件处理程序会等待,然后着火,恢复写入。


特别是关于您的代码,您不需要使用drain事件,因为您在打开流后只编写了一次。由于您尚未向流中写入任何内容,因此内部缓冲区为空,您必须以块的形式写入至少16KB才能触发drain事件。 highWaterMark事件用于写入多次数据,而不是可写流的Stream.write设置。


8
投票

想象一下,您正在连接具有不同带宽的2个流,例如,将本地文件上载到慢速服务器。 (快速)文件流将比(慢)套接字流消耗它更快地发出数据。

在这种情况下,node.js会将数据保存在内存中,直到慢流有机会处理它。如果文件非常大,这可能会出现问题。

为了避免这种情况,当底层系统缓冲区已满时,false会返回drain。如果您停止写入,流将稍后发出pause/resume事件以指示系统缓冲区已清空,并且再次写入是合适的。

您可以使用readable.pipe(writable)可读流并控制可读流的带宽。

更好:你可以使用write来为你做这件事。

编辑:您的代码中存在错误:无论data返回什么,您的数据都已写入。您无需重试它。在你的情况下,你写两次var packets = […], current = -1; function niceWrite() { current += 1; if (current === packets.length) return stream.end(); var nextPacket = packets[current], canContinue = stream.write(nextPacket); // wait until stream drains to continue if (!canContinue) stream.once('drain', niceWrite); else niceWrite(); }

像这样的东西会起作用:

const write = (writer, data) => {
  return new Promise((resolve) => {
    if (!writer.write(data)) {
      writer.once('drain', resolve)
    }
    else {
      resolve()
    }
  })
}

// usage
const run = async () => {
  const write_stream = fs.createWriteStream('...')
  const max = 1000000
  let current = 0
  while (current <= max) {
    await write(write_stream, current++)
  }
}

1
投票

这是一个带有async / await的版本

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73

promise


0
投票

这是使用Promises(async / await)的速度优化版本。调用者必须检查它是否得到await,只有在那种情况下才需要调用const write = (writer, data) => { // return a promise only when we get a drain if (!writer.write(data)) { return new Promise((resolve) => { writer.once('drain', resolve) }) } } // usage const run = async () => { const write_stream = fs.createWriteStream('...') const max = 1000000 let current = 0 while (current <= max) { const promise = write(write_stream, current++) // since drain happens rarely, awaiting each write call is really slow. if (promise) { // we got a drain event, therefore we wait await promise } } } 。等待每次通话可以使程序减慢3倍...

qazxswpoi
© www.soinside.com 2019 - 2024. All rights reserved.