我正在 JS/浏览器代码中使用 ReadableStreams 并遇到以下问题。当使用
ReadableStreamBYOBReader
控制流读取的大小时,如果在读取请求期间关闭流而没有排队数据,则读取永远不会返回。请参阅下面的第一个片段。
使用
ReadableStreamDefaultReader
读取流时,最后一次读取确实会按预期返回。请参阅下面的第二个片段(片段中的流实现是相同的)。
考虑到这在 Chrome 和 FF 中以相同的方式挂起,这似乎是预期的行为。 但是如果是这样,当您不知道发出每个读取请求之前还剩多少数据时,如何关闭正在 BYOB 模式下读取的流? 在控制器上引发错误或取消流会导致触发返回,但两个都很丑。
我已经阅读了一些 whatwg 规范,它没有直接说明当流关闭并且 BYOD 中没有块时应该发生什么
close steps (no chunk)
,但它确实说流应该在默认模式下关闭。
如果流关闭,则承诺将得到履行 流中的剩余元素,可能少于 最初要求的金额。如果没有给出,那么承诺就解决了 当至少有一个元素可用时。
下面的代码片段可以在 Chrome 和 FF 中运行,但不能在 Safari 中运行,因为 Safari 还没有完整的流 API。另外,请忽略
byobRequest
未使用且流读取未优化。我的目标是简化逻辑。
// Hangs if you try to read and the stream closes (when readBlocks > streamBlocks)
const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;
const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);
readAllBYOB(reader, buffer).then(([blocks, done]) => {
reader.releaseLock();
let byteLen = 0;
for(const block of blocks) {
byteLen += block.byteLength;
}
console.log("all done, bytes read:", byteLen, done);
});
function makeStream(loops) {
let totalBytesOutput = 0;
console.log("creating stream size:", loops * blockSize);
return new ReadableStream({
type: "bytes",
async start(controller) {
console.log(
`stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
const data = new TextEncoder().encode("s".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream start- error, closing", err);
controller.error(err);
}
},
async pull(controller) {
// ignoring actual byobReuest object
console.log(
`stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
// Pretend we don't know when data runs out until the request is made.
// In BYOD mode, the read never returns. Unless you do one of the following:
// 1. Enqueueing data before calling close (but we don't have any to enqueue)
// 2. Call controller.error(), but that's ugly
// 3. Call stream.cancel(), which also seems wrong
if (totalBytesOutput >= blockSize * loops) {
console.log("stream pull- closing");
controller.close();
return;
}
const data = new TextEncoder().encode("p".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream pull- error, closing", err);
controller.error(err);
}
},
});
}
async function readAllBYOB(reader, output) {
let targetBytes = output.byteLength;
let readBytes = 0;
let blocks = [];
let streamDone = false;
console.log('readAllBYOB- start: ', targetBytes);
while (readBytes < targetBytes) {
console.log('readAllBYOB- try reading:', output.byteLength);
// This does not return on the final read, even when stream is closed
let { done, value } = await reader.read(output);
console.log('readAllBYOB- read, done:', value?.byteLength, done);
streamDone = done;
if (value) {
blocks.push(value);
readBytes += value.byteLength;
}
if (done || !value) {
break;
}
if (readBytes < targetBytes) {
output = new Uint8Array(targetBytes - readBytes);
}
}
console.log(
'readAllBYOB- blocks, remainingBytes, done:',
blocks.length,
targetBytes - readBytes,
streamDone
);
return [blocks, streamDone];
}
// Works as expected
const streamBlocks = 3;
const blockSize = 1024;
const stream = makeStream(streamBlocks);
const reader = stream.getReader();
readAll(reader).then(([blocks, done]) => {
reader.releaseLock();
let byteLen = 0;
for(const block of blocks) {
byteLen += block.byteLength;
}
console.log("all done, bytes read:", byteLen, done);
});
function makeStream(loops) {
let totalBytesOutput = 0;
console.log("creating stream size:", loops * blockSize);
return new ReadableStream({
type: "bytes",
async start(controller) {
console.log(
`stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
const data = new TextEncoder().encode("s".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream start- error, closing", err);
controller.error(err);
}
},
async pull(controller) {
// ignoring actual byobReuest object
console.log(
`stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
// Pretend we don't know when data runs out until the request is made.
// In BYOD mode, the read never returns. Unless you do one of the following:
// 1. Enqueueing data before calling close (but we don't have any to enqueue)
// 2. Call controller.error(), but that's ugly
// 3. Call stream.cancel(), which also seems wrong
if (totalBytesOutput >= blockSize * loops) {
console.log("stream pull- closing");
controller.close();
return;
}
const data = new TextEncoder().encode("p".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream pull- error, closing", err);
controller.error(err);
}
},
});
}
async function readAll(reader) {
let readBytes = 0;
let blocks = [];
let streamDone = false;
console.log('readAll- start');
while (true) {
console.log('readAll- try reading');
// This always returns as expected
let { done, value } = await reader.read();
console.log('readAll- read, done:', value?.byteLength, done);
streamDone = done;
if (value) {
blocks.push(value);
readBytes += value.byteLength;
}
if (done || !value) {
break;
}
}
console.log(
'readAll- blocks, done:',
blocks.length,
streamDone
);
return [blocks, streamDone];
}
respond()
的 .byobRequest
方法,将 0
作为 bytesRead 传递。
const readBlocks = 4;
const streamBlocks = 3;
const blockSize = 1024;
const stream = makeStream(streamBlocks);
const reader = stream.getReader({ mode: "byob" });
let buffer = new Uint8Array(blockSize * readBlocks);
readAllBYOB(reader, buffer).then(([blocks, done]) => {
reader.releaseLock();
let byteLen = 0;
for(const block of blocks) {
byteLen += block.byteLength;
}
console.log("all done, bytes read:", byteLen, done);
});
function makeStream(loops) {
let totalBytesOutput = 0;
console.log("creating stream size:", loops * blockSize);
return new ReadableStream({
type: "bytes",
async start(controller) {
console.log(
`stream start- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
const data = new TextEncoder().encode("s".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.log("stream start- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream start- error, closing", err);
controller.error(err);
}
},
async pull(controller) {
// ignoring actual byobReuest object
console.log(
`stream pull- ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
try {
// Pretend we don't know when data runs out until the request is made.
if (totalBytesOutput >= blockSize * loops) {
console.log("stream pull- closing");
controller.close();
controller.byobRequest?.respond(0);
return;
}
const data = new TextEncoder().encode("p".repeat(blockSize));
totalBytesOutput += data.byteLength;
console.log("stream pull- enqueuing, total:", data.byteLength, totalBytesOutput);
controller.enqueue(data);
} catch (err) {
console.error("stream pull- error, closing", err);
controller.error(err);
}
},
});
}
async function readAllBYOB(reader, output) {
let targetBytes = output.byteLength;
let readBytes = 0;
let blocks = [];
let streamDone = false;
console.log('readAllBYOB- start: ', targetBytes);
while (readBytes < targetBytes) {
console.log('readAllBYOB- try reading:', output.byteLength);
// This does not return on the final read, even when stream is closed
let { done, value } = await reader.read(output);
console.log('readAllBYOB- read, done:', value?.byteLength, done);
streamDone = done;
if (value) {
blocks.push(value);
readBytes += value.byteLength;
}
if (done || !value) {
break;
}
if (readBytes < targetBytes) {
output = new Uint8Array(targetBytes - readBytes);
}
}
console.log(
'readAllBYOB- blocks, remainingBytes, done:',
blocks.length,
targetBytes - readBytes,
streamDone
);
return [blocks, streamDone];
}