我正在玩弄 Nodejs 流。本质上,我正在读取一个文件,通过一些处理来运行它 - 在我的例子中,将文本设置为大写 - 然后将输出转储到前端。
在前端,我有一个
EventSource
,它建立与端点的连接并获取数据。
唯一的“问题”是我试图模拟转换逻辑中的延迟。
const express = require('express');
const app = express();
const port = 3010;
const path = require('path');
const { Transform } = require('node:stream');
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
app.use(express.static('static'));
app.get('/', (req, res) => {
res.sendFile(path.resolve('pages/index.html'));
});
const processChunk = async function (chunk) {
// simulate delay
await new Promise((resolve, reject) => {
setTimeout(resolve, 50);
});
return chunk.toUpperCase();
};
app.get('/hello', async (req, res) => {
res.type('text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
const readStream = fs.createReadStream('./sample.txt', {
highWaterMark: 5 * 1024, // does not work unless this value is changed to 2.5 * 1024
});
async function* processStream(source, { signal }) {
source.setEncoding('utf-8');
for await (const chunk of source) {
yield processChunk(chunk);
}
}
const transformStream = new Transform({
async transform(chunk, encoding, callback) {
const s = chunk.toString();
res.write(`data: ${s}\n\n`);
callback();
},
});
transformStream.on('end', () => {
console.log('ending');
res.end();
});
await pipeline(readStream, processStream, transformStream);
});
app.listen(port, () => {
console.log(`Example app listening at http://localhost:${port}`);
});
我有一个20kb的文件;现在,只要
highWaterMark
值不超过 2.5kb,所有这些都有效。
因此 highWaterMark 设置应在内存中传输多少数据的阈值,直到缓冲区清空时暂停为止。 因此,基本上您可以将服务器缓冲区视为文件和前端之间的管道,并且该管道只能包含这么多的水。 highWaterMark 的目的是告诉你的程序在需要等待管道清空之前它可以处理多少数据。
所以发生的情况是,您的服务器无法处理
5 * 1024
的数据,但它可以处理批量 2.5*1024
的数据。基本上,如果没有适当的高水位标记,您最终会出现崩溃、内存使用率高和垃圾收集器性能差的情况。 您还可以查看 back压 以及资源。
您可以通过增加服务器可用资源来保持该数字不变。