如何根据计数限制将可读流动态拆分为多个流

问题描述 投票:0回答:1

我有

Readable
流,它向另一个
Transform
流提供对象(这是来自
CsvFormatterStream<I, O> npm 包
@fast-csv/format
)。然后 csv 流通过管道传输到 fs
Writeable
流以写入 CSV 文件。请参阅帖子底部的示例。

我正在寻找一种解决方案,将

Readable
流中的对象拆分为
n
大小的块,因此它将生成最大数据行数 =
n
的 CSV 文件。

例如:

  • n
    = 10,
  • 可读流产生的对象总数 = 32

它应该生成 4 个 CSV 文件:3 个行数 = 10 的 CSV 文件和 1 个行数 = 2 的 CSV 文件。

我怎样才能实现这个目标?


UPD。我想要一个具有可配置

n
的自定义流,它将从另一个流接收对象并生成
y
可读流,其中
y
最大大小的块数
n

我已经实现了它,稍后当SO允许我这样做时会发布它。


示例:

先决条件:

  • npm 我@fast-csv/format
const { Readable } = require('node:stream');
const {format} = require('@fast-csv/format');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('node:stream/promises');

(async () => {
  try {
    const readable = new Readable({ objectMode: true });
    const csvStream = format({ headers: true });
    const fileStream = fs.createWriteStream(path.join(process.cwd(), 'test.csv'), {
      flags: 'w',
    });

    const objects = createObjects(32);
    objects.forEach(obj => readable.push(obj));
    readable.push(null);

    await pipeline(readable, csvStream, fileStream);
    console.log('Success!');
  } catch (err) {
    console.error(err);
  }
})();

function createObjects(n) {
  const objects = [];

  for (let i = 0; i < n; i++) {
    const obj = createObject(i);
    objects.push(obj);
  }

  return objects;
}

function createObject(i) {
  return {
    id: i,
    name: `Obj #${i}`,
  };
}
node.js nodejs-stream fast-csv
1个回答
0
投票

可以通过实现

Transform
流来完成分割,该流将累积块并将它们作为另一个流释放。然后我们可以使用异步迭代器迭代结果。

SplitterStream类:

class SplitterStream extends Transform {
  constructor(limit, opts) {
    super({readableObjectMode: true, writableObjectMode: true});
    this.limit = limit;
    this.opts = opts;
    this.pushedCount = 0;
    this.currStream = new PassThrough(this.opts);
  }

  _transform(row, _encoding, callback) {
    this.currStream.write(row);
    this.pushedCount++;

    if (this.pushedCount === this.limit) {
      this.currStream.end();
      this.push(this.currStream);
      this.currStream = new PassThrough(this.opts);
      this.pushedCount = 0;
    }
    callback();
  }

  _flush(callback) {
    // If last rows were not pushed - push them before the end
    if (this.pushedCount > 0) {
      this.push(this.currStream);
      this.currStream = null;
      this.pushedCount = 0;
    }
    callback();
  }
}

使用示例。它按预期生成 4 个文件

(async () => {
  try {
    // Create and fill readable
    const readable = new Readable({ objectMode: true });
    const splitterStream = new SplitterStream(10, {readableObjectMode: true, writableObjectMode: true});

    const objects = createObjects(32);
    objects.forEach(obj => readable.push(obj));
    readable.push(null);

    readable.pipe(splitterStream);

    let i = 1;
    for await (const _readable of splitterStream) {
      const csvStream = format({ headers: true });
      const fileStream = fs.createWriteStream(path.join(process.cwd(), `test_${i++}.csv`), {
        flags: 'w',
      });

      await pipeline(_readable, csvStream, fileStream);
    }
    
    console.log('Success!');
  } catch (err) {
    console.error(err);
  }
})();
© www.soinside.com 2019 - 2024. All rights reserved.