假设我有一个 ReadableStream 对象(来自 Streams API Web 标准):
let readableStream = (await fetch('http://example.com')).body;
我知道我可以逐块读取它,其中块大小通常取决于网络:
for await (let chunk of readableStream) {
let chunkString = new TextDecoder().decode(chunk);
console.dir(chunkString);
}
但是如何逐行读取 ReadableStream 呢?
(请注意,行可以跨越多个块,因此仅分割每个块是不够的。)
由于换行符本身由单个字节组成(因此不能分布在多个块中),因此使用 TransformStream 可以很容易地实现:
function concatArrayBuffers(chunks: Uint8Array[]): Uint8Array {
const result = new Uint8Array(chunks.reduce((a, c) => a + c.length, 0));
let offset = 0;
for (const chunk of chunks) {
result.set(chunk, offset);
offset += chunk.length;
}
return result;
}
class LineSplitter extends TransformStream<Uint8Array, Uint8Array> {
protected _buffer: Uint8Array[] = [];
constructor() {
super({
transform: (chunk, controller) => {
let index;
let rest = chunk;
while ((index = rest.indexOf(0x0a)) !== -1) {
controller.enqueue(concatArrayBuffers([...this._buffer, rest.slice(0, index + 1)]));
rest = rest.slice(index + 1);
this._buffer = [];
}
if (rest.length > 0) {
this._buffer.push(rest);
}
},
flush: (controller) => {
if (this._buffer.length > 0) {
controller.enqueue(concatArrayBuffers(this._buffer));
}
}
});
}
}
const linesStream = (await fetch('http://example.com')).body
.pipeThrough(new LineSplitter());
for await (const line of linesStream.pipeThrough(new StringDecoderStream())) {
console.log(line);
}
此示例使用内部缓冲区重新排列块,并为每一行发出一个块(包括末尾的换行符)。