Nodejs PassThrough流

问题描述 投票:2回答:1

我想通过net.Socket(TCP)流传输fs.Readstream。为此,我使用.pipe。当fs.Readstream完成时,我不想结束net.Socket流。这就是为什么我使用

readStream.pipe(socket, { end: false })

不幸的是,我没有在另一侧获得“接近”,“完成”或“结束”。这样可以防止我关闭另一侧的fs.Writestream。但是,保留了net.Socket连接,我也需要它,因为我想收到一个ID作为响应。由于我没有得到“关闭”或“完成”的信息,因此很遗憾,我无法结束fs.Writestream,因此无法发送带有相应ID的响应

是否可以通过net.socket手动发送'close'或'finish'事件而不关闭它?使用该命令,只有我自己的事件会发生反应。谁能告诉我我做错了吗?

    var socket : net.Socket; //TCP connect
    var readStream = fs.createWriteStream('test.txt');

    socket.on('connect', () => {
        readStream.pipe(socket, {
            end: false
        })
        readStream.on('close', () => {
            socket.emit('close');
            socket.emit('finish');
        })

        //waiting for answer
        //waiting for answer
        //waiting for answer

        socket.on('data', (c) => {
            console.log('got my answer: ' + c.toString());
        })
    })
}
node.js stream pass-through
1个回答
2
投票

好吧,除了提供另一种方式让另一端知道该流已经以编程方式结束之外,单个流几乎没有什么可以做的。

[当套接字发送end事件时,它实际上会清空缓冲区,然后关闭TCP连接,然后,在最后一个字节交付后,另一端将其转换为finish。为了重新使用连接,您可以考虑以下两个选项:

一个:使用HTTP保持活动状态

您可以想象您不是遇到此问题的第一个人。实际上这是很平常的事情,您已经介绍了HTTP之类的协议。这将带来较小的开销,但仅在流的开始和结束时-在您的情况下,这比其他选项更容易接受。]

除了使用基本的TCP流之外,您还可以简单地使用HTTP连接并通过http请求发送数据,HTTP POST请求就很好了,除了放弃{end: false}之外,您的代码看起来没有什么不同。套接字将需要发送其标头,因此将这样构造:

const socket : HTTP.ClientRequest = http.request({method: 'POST', url: '//wherever.org/somewhere/there:9087', headers: {
   'connection': 'keep-alive',
   'transfer-encoding': 'chunked'
}}, (res) => {
   // here you can call the code to push more streams since the 
});

readStream.pipe(socket); // so our socket (vel connection) will end, but the underlying channel will stay open.

实际上,您不需要等待套接字连接,就可以像上面的示例一样直接通过管道传输流,但是请检查连接失败时的行为。因为HTTP请求类实现了所有TCP连接事件和方法,所以您等待的connect事件也将起作用(尽管签名可能会有一些细微的差别)。

更多阅读:

两个:使用“魔术”结束包

在这种情况下,您要发送一个简单的结束数据包,例如:套接字末尾的\x00(一个nul字符)。这有一个主要缺点,因为您将需要对流进行某些操作,以确保否则不会出现nul字符-这会增加数据处理的开销(因此会占用更多CPU)。

为了做到这一点,您需要先将数据通过转换流推送,然后再将其发送到套接字-这是一个示例,但是它仅适用于字符串,因此请根据需要进行调整。

const zeroEncoder = new Transform({
  encoding: 'utf-8',
  transform(chunk, enc, cb) { cb(chunk.toString().replace('\x00', '\\x00')); },
  flush: (cb) => cb('\x00')
});

// ... whereever you do the writing:

readStream
  .pipe(zeroEncoder)
  .on('unpipe', () => console.log('this will be your end marker to send in another stream'))
  .pipe(socket, {end: false})

然后在另一边:

tcpStream.on('data', (chunk) => {
  if (chunk.toString().endsWith('\x00')) {
    output.end(decodeZeros(chunk));
    // and rotate output
  } else {
    output.write(decodeZeros(chunk));
  }
});

正如您所看到的,这要复杂得多,这只是一个示例-您可以通过使用JSON,7位传输编码或其他方式来稍微简化一下,但是在所有情况下,这都需要一些技巧和大多数重要的是通读整个流,并为此增加更多的内存-因此,我并不真正推荐这种方法。如果您这样做:

  • 确保您正确编码/解码数据
  • 考虑是否可以找到一个不会出现在数据中的字节
  • 上面的内容可能适用于字符串,但至少对于缓冲区来说是不好的
  • 最后没有错误控制或流控制-因此至少需要pause / resume逻辑。
  • 我希望这会有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.