使用 Vanilla JS 从 ReadableStream 解析 JSON 数据块

问题描述 投票:0回答:2

我正在获取一个大的 JSON 文件(200mb),我想在数据流入时渲染该数据。我遇到的问题是,在解码和解析流式传输给我的块后,返回语法错误在控制台中对我来说

Unexpected end of JSON input
。我想要做的是解析返回块并在获得数据后立即对其进行处理。但是,由于 ReadableStream 以块的形式进行流式传输,而这些块的切片方式是不可预测的,因此我无法对返回值执行 JSON.parse() 。需要进行什么样的数据处理才能实现这一点?有更好的方法吗?

这是我的代码:

const decoder = new TextDecoder('utf-8')
fetch("../files/response.json")
    .then(response => {
        const reader = response.body.getReader()
        new ReadableStream({
            start(controller) {
                function enqueueValues() {
                    reader.read()
                    .then(({ done, value }) => {
                        if (done) {
                            controller.close() // stream is complete
                            return
                        }
                        var decodedValue = decoder.decode(value) // one chunk of invalid json data in string format

                        console.log(JSON.parse(decodedValue)) // json syntax error

                        // do something with the json value here

                        controller.enqueue(value)

                        enqueueValues() // run until all data has been streamed
                    })
                }
                enqueueValues()
            }
        })
    })
javascript json stream
2个回答
1
投票

我认为实现这一点的唯一方法是在每个块中发送有效的 json 数据(对象、数组)。

这是一个示例express.js 处理程序:

app.get("/stream", (req, res) => {
  let i = 0;
  const interval = setInterval((a) => {
    i += 1;
    res.write(JSON.stringify([{ message: `Chunk ${i}` }]));
  }, 500);

  setTimeout(() => {
    clearInterval(interval);
    res.end(() => {
      console.log("End");
    });
  }, 5000);
});

这样做的缺点是最终的 json(所有块连接成一个字符串)无效。但是在浏览器内存中保存 200mb 对象也不好。

更新: 我试图解决我的项目中的类似问题并找到了解决方法。

  • 简单地将我的所有块(对象)包装到一个数组中
  • 将左方括号和右方括号作为单独的块发送
  • 在每个数据块的末尾添加逗号。

然后在客户端上,我忽略等于

[
]
的块,并剪切每个数据块中的结尾逗号。

服务器:

app.get("/stream", (req, res) => {
  let i = 0,
    chunkString;
  res.write("["); // <<---- OPENING bracket
  const interval = setInterval((a) => {
    i += 1;
    chunkString = JSON.stringify({ message: `Chunk ${i}` });
    res.write(`${chunkString},`);   // <<----- Note ending comma at the end of each data chunk
  }, 500);

  setTimeout(() => {
    clearInterval(interval);
    res.end("]", () => {. // <<---- CLOSING bracket
      console.log("End");
    });
  }, 5000);
});

客户:

const decoder = new TextDecoder("utf-8");

const handleJsonChunk = (jsonChunk) => {
  console.log("Received Json Chunk: ", jsonChunk);
};

const main = async () => {
  const response = await fetch("http://localhost:3000/stream");
  const reader = response.body.getReader();
  const skipValues = ["[", "]"];

  const work = (reader) => {
    reader.read().then(({ done, value }) => {
      if (!done) {
        let stringValue = decoder.decode(value);
        const skip = skipValues.indexOf(stringValue) >= 0;
        if (skip) return work(reader);

        if (stringValue.endsWith(","))
          stringValue = stringValue.substr(0, stringValue.length - 1);

        try {
          const jsonValue = JSON.parse(stringValue);
          handleJsonChunk(jsonValue);
        } catch (error) {
          console.log(`Failed to parse chunk. Error: ${error}`);
        }

        work(reader);
      }
    });
  };
  work(reader);
};

main();

0
投票

您需要的是一个支持流式传输的 JSON 解析器。有很多库,其中之一是 json-stream-es,我维护它。

从您提供的描述来看,数据似乎以 JSONL 格式到达,这意味着数据不是单个 JSON 文档,而是多个文档(通常由换行符分隔)。

您可以使用 json-stream-es 解析这样的流,如下所示:

import { stringifyMultiJsonStream, streamToIterable } from "json-stream-es";

const response = await fetch("../files/response.json");
const values = response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(parseJsonStream(undefined, { multi: true }));

for await (const decodedValue of streamToIterable(values)) {
    console.log(decodedValue);

    // do something with the json value here
}
© www.soinside.com 2019 - 2024. All rights reserved.