将 ReadableStream 拆分成行

问题描述 投票:0回答:1

假设我有一个 ReadableStream 对象(来自 Streams API Web 标准):

let readableStream = (await fetch('http://example.com')).body;

我知道我可以逐块读取它,其中块大小通常取决于网络:

for await (let chunk of readableStream) {
  let chunkString = new TextDecoder().decode(chunk);
  console.dir(chunkString);
}

但是如何逐行读取 ReadableStream 呢?

(请注意,行可以跨越多个块,因此仅分割每个块是不够的。)

whatwg-streams-api
1个回答
0
投票

由于换行符本身由单个字节组成(因此不能分布在多个块中),因此使用 TransformStream 可以很容易地实现:

function concatArrayBuffers(chunks: Uint8Array[]): Uint8Array {
    const result = new Uint8Array(chunks.reduce((a, c) => a + c.length, 0));
    let offset = 0;
    for (const chunk of chunks) {
        result.set(chunk, offset);
        offset += chunk.length;
    }
    return result;
}

class LineSplitter extends TransformStream<Uint8Array, Uint8Array> {
    protected _buffer: Uint8Array[] = [];

    constructor() {
        super({
            transform: (chunk, controller) => {
                let index;
                let rest = chunk;
                while ((index = rest.indexOf(0x0a)) !== -1) {
                    controller.enqueue(concatArrayBuffers([...this._buffer, rest.slice(0, index + 1)]));
                    rest = rest.slice(index + 1);
                    this._buffer = [];
                }

                if (rest.length > 0) {
                    this._buffer.push(rest);
                }
            },
            flush: (controller) => {
                if (this._buffer.length > 0) {
                    controller.enqueue(concatArrayBuffers(this._buffer));
                }
            }
        });
    }
}

const linesStream = (await fetch('http://example.com')).body
    .pipeThrough(new LineSplitter());

for await (const line of linesStream.pipeThrough(new StringDecoderStream())) {
    console.log(line);
}

此示例使用内部缓冲区重新排列块,并为每一行发出一个块(包括末尾的换行符)。

© www.soinside.com 2019 - 2024. All rights reserved.