为什么在 BYOB 模式下流关闭时 read 不返回

问题描述 投票:0回答:1

我正在 JS/浏览器代码中使用 ReadableStreams 并遇到以下问题。当使用

ReadableStreamBYOBReader
控制流读取的大小时,如果在读取请求期间关闭流而没有排队数据,则读取永远不会返回。请参阅下面的第一个片段。

使用

ReadableStreamDefaultReader
读取流时,最后一次读取确实会按预期返回。请参阅下面的第二个片段(片段中的流实现是相同的)。

考虑到这在 Chrome 和 FF 中以相同的方式挂起,这似乎是预期的行为。 但是如果是这样,当您不知道发出每个读取请求之前还剩多少数据时,如何关闭正在 BYOB 模式下读取的流? 在控制器上引发错误或取消流会导致触发返回,但两个都很丑。

我已经阅读了一些 whatwg 规范,它没有直接说明当流关闭并且 BYOD 中没有块时应该发生什么

close steps (no chunk)
,但它确实说流应该在默认模式下关闭。

如果流关闭,则承诺将得到履行 流中的剩余元素,可能少于 最初要求的金额。如果没有给出,那么承诺就解决了 当至少有一个元素可用时。

下面的代码片段可以在 Chrome 和 FF 中运行,但不能在 Safari 中运行,因为 Safari 还没有完整的流 API。另外,请忽略

byobRequest
未使用且流读取未优化。我的目标是简化逻辑。

// Hangs if you try to read and the stream closes (when readBlocks > streamBlocks)
const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);

readAllBYOB(reader, buffer).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.log("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.log("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.log(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.log(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        // In BYOD mode, the read never returns. Unless you do one of the following:
        //  1. Enqueueing data before calling close (but we don't have any to enqueue)
        //  2. Call controller.error(), but that's ugly
        //  3. Call stream.cancel(), which also seems wrong
        if (totalBytesOutput >= blockSize * loops) {
          console.log("stream pull- closing");
          controller.close();
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAllBYOB(reader, output) {
  let targetBytes = output.byteLength;
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.log('readAllBYOB- start: ', targetBytes);

  while (readBytes < targetBytes) {
    console.log('readAllBYOB- try reading:', output.byteLength);

    // This does not return on the final read, even when stream is closed
    let { done, value } = await reader.read(output);
    console.log('readAllBYOB- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
    if (readBytes < targetBytes) {
      output = new Uint8Array(targetBytes - readBytes);
    }
  }

  console.log(
    'readAllBYOB- blocks, remainingBytes, done:',
    blocks.length,
    targetBytes - readBytes,
    streamDone
  );

  return [blocks, streamDone];
}

// Works as expected
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader();

readAll(reader).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.log("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.log("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.log(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.log(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        // In BYOD mode, the read never returns. Unless you do one of the following:
        //  1. Enqueueing data before calling close (but we don't have any to enqueue)
        //  2. Call controller.error(), but that's ugly
        //  3. Call stream.cancel(), which also seems wrong
        if (totalBytesOutput >= blockSize * loops) {
          console.log("stream pull- closing");
          controller.close();
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAll(reader) {
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.log('readAll- start');

  while (true) {
    console.log('readAll- try reading');

    // This always returns as expected
    let { done, value } = await reader.read();
    console.log('readAll- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
  }

  console.log(
    'readAll- blocks, done:',
    blocks.length,
    streamDone
  );

  return [blocks, streamDone];
}

javascript browser whatwg-streams-api
1个回答
0
投票

看来您应该满足自带酒水的要求。为此,您可以调用控制器

respond()
.byobRequest
方法,将
0
作为 bytesRead 传递。

const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;

const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);

readAllBYOB(reader, buffer).then(([blocks, done]) => {
  reader.releaseLock();
  let byteLen = 0;
  for(const block of blocks) {
     byteLen += block.byteLength;
  }
  console.log("all done, bytes read:", byteLen, done);
});

function makeStream(loops) {
  let totalBytesOutput = 0;
  console.log("creating stream size:", loops * blockSize);

  return new ReadableStream({
    type: "bytes",

    async start(controller) {
      console.log(
        `stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        const data = new TextEncoder().encode("s".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream start- error, closing", err);
        controller.error(err);
      }
    },

    async pull(controller) {
      // ignoring actual byobReuest object
      console.log(
        `stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
      );

      try {
        // Pretend we don't know when data runs out until the request is made.
        if (totalBytesOutput >= blockSize * loops) {
          console.log("stream pull- closing");
          controller.close();
          controller.byobRequest?.respond(0);
          return;
        }

        const data = new TextEncoder().encode("p".repeat(blockSize));
        totalBytesOutput += data.byteLength;
        console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
        controller.enqueue(data);
      } catch (err) {
        console.error("stream pull- error, closing", err);
        controller.error(err);
      }
    },
  });
}

async function readAllBYOB(reader, output) {
  let targetBytes = output.byteLength;
  let readBytes = 0;
  let blocks = [];
  let streamDone = false;
  console.log('readAllBYOB- start: ', targetBytes);

  while (readBytes < targetBytes) {
    console.log('readAllBYOB- try reading:', output.byteLength);

    // This does not return on the final read, even when stream is closed
    let { done, value } = await reader.read(output);
    console.log('readAllBYOB- read, done:', value?.byteLength, done);

    streamDone = done;
    if (value) {
      blocks.push(value);
      readBytes += value.byteLength;
    }

    if (done || !value) {
      break;
    }
    if (readBytes < targetBytes) {
      output = new Uint8Array(targetBytes - readBytes);
    }
  }

  console.log(
    'readAllBYOB- blocks, remainingBytes, done:',
    blocks.length,
    targetBytes - readBytes,
    streamDone
  );

  return [blocks, streamDone];
}

© www.soinside.com 2019 - 2024. All rights reserved.