我有
Readable
流,它向另一个 Transform
流提供对象(这是来自 CsvFormatterStream<I, O>
npm 包的
@fast-csv/format
)。然后 csv 流通过管道传输到 fs Writeable
流以写入 CSV 文件。请参阅帖子底部的示例。
我正在寻找一种解决方案,将
Readable
流中的对象拆分为 n
大小的块,因此它将生成最大数据行数 = n
的 CSV 文件。
例如:
n
= 10,它应该生成 4 个 CSV 文件:3 个行数 = 10 的 CSV 文件和 1 个行数 = 2 的 CSV 文件。
我怎样才能实现这个目标?
UPD。我想要一个具有可配置
n
的自定义流,它将从另一个流接收对象并生成y
可读流,其中y
最大大小的块数n
我已经实现了它,稍后当SO允许我这样做时会发布它。
先决条件:
const { Readable } = require('node:stream');
const {format} = require('@fast-csv/format');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('node:stream/promises');
(async () => {
try {
const readable = new Readable({ objectMode: true });
const csvStream = format({ headers: true });
const fileStream = fs.createWriteStream(path.join(process.cwd(), 'test.csv'), {
flags: 'w',
});
const objects = createObjects(32);
objects.forEach(obj => readable.push(obj));
readable.push(null);
await pipeline(readable, csvStream, fileStream);
console.log('Success!');
} catch (err) {
console.error(err);
}
})();
function createObjects(n) {
const objects = [];
for (let i = 0; i < n; i++) {
const obj = createObject(i);
objects.push(obj);
}
return objects;
}
function createObject(i) {
return {
id: i,
name: `Obj #${i}`,
};
}
可以通过实现
Transform
流来完成分割,该流将累积块并将它们作为另一个流释放。然后我们可以使用异步迭代器迭代结果。
SplitterStream类:
class SplitterStream extends Transform {
constructor(limit, opts) {
super({readableObjectMode: true, writableObjectMode: true});
this.limit = limit;
this.opts = opts;
this.pushedCount = 0;
this.currStream = new PassThrough(this.opts);
}
_transform(row, _encoding, callback) {
this.currStream.write(row);
this.pushedCount++;
if (this.pushedCount === this.limit) {
this.currStream.end();
this.push(this.currStream);
this.currStream = new PassThrough(this.opts);
this.pushedCount = 0;
}
callback();
}
_flush(callback) {
// If last rows were not pushed - push them before the end
if (this.pushedCount > 0) {
this.push(this.currStream);
this.currStream = null;
this.pushedCount = 0;
}
callback();
}
}
使用示例。它按预期生成 4 个文件
(async () => {
try {
// Create and fill readable
const readable = new Readable({ objectMode: true });
const splitterStream = new SplitterStream(10, {readableObjectMode: true, writableObjectMode: true});
const objects = createObjects(32);
objects.forEach(obj => readable.push(obj));
readable.push(null);
readable.pipe(splitterStream);
let i = 1;
for await (const _readable of splitterStream) {
const csvStream = format({ headers: true });
const fileStream = fs.createWriteStream(path.join(process.cwd(), `test_${i++}.csv`), {
flags: 'w',
});
await pipeline(_readable, csvStream, fileStream);
}
console.log('Success!');
} catch (err) {
console.error(err);
}
})();