如何从流中复制数据同时转发流

问题描述 投票:0回答:2

我正在使用 hyper 0.12 构建代理服务。当从上游服务器接收到响应正文时,我想尽快将其转发回客户端,并且将内容保存在缓冲区中以供以后处理。

所以我需要一个函数:

  • 需要一个
    Stream
    (准确地说,是一个
    hyper::Body
  • 返回一个与输入流功能相同的
    Stream
  • 返回某种
    Future<Item = Vec<u8>, Error = ...>
    ,当输出流完全消耗时,它可以通过输入流的缓冲内容来解析

我一辈子都不知道该怎么做。

我想我正在寻找的功能看起来像这样:

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let body2 = ... // ???
    let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
        buf.extend_from_slice(&chunk);
        // ...somehow send this chunk to body2 also?
    });
    (body2, buffer);
}

以下是我尝试过的方法,它一直有效,直到

send_data()
失败(显然)。

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let (mut sender, body2) = hyper::Body::channel();
    let consume =
        body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
            buf.extend_from_slice(&chunk);

            // What to do if this fails?
            if sender.send_data(chunk).is_err() {}
            Box::new(future::ok(buf))
        });

    (body2, Box::new(consume));
}

然而,有些事情告诉我我走错了路。

我找到了

Sink.fanout()
,这似乎就是我想要的,但我没有
Sink
,而且我不知道如何构建一个。
hyper::Body
实现
Stream
但不实现
Sink

rust hyper
2个回答
5
投票

我最终所做的是实现一种新型的流来满足我的需要。这似乎是必要的,因为

hyper::Body
不实现
Sink
也不实现
hyper::Chunk
(这是
Clone
所必需的),所以我不能使用任何现有的组合器。

首先是一个结构体,其中包含我们需要的所有详细信息和附加新块的方法,以及通知缓冲区已完成。

Sink.fanout()

然后我为这个结构实现了 
struct BodyClone<T> { body: T, buffer: Option<Vec<u8>>, sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>, } impl BodyClone<hyper::Body> { fn flush(&mut self) { if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) { if sender.send(buffer).is_err() {} } } fn push(&mut self, chunk: &hyper::Chunk) { use hyper::body::Payload; let length = if let Some(buffer) = self.buffer.as_mut() { buffer.extend_from_slice(chunk); buffer.len() as u64 } else { 0 }; if let Some(content_length) = self.body.content_length() { if length >= content_length { self.flush(); } } } }

特征。


Stream

最后我可以在
impl Stream for BodyClone<hyper::Body> { type Item = hyper::Chunk; type Error = hyper::Error; fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> { match self.body.poll() { Ok(Async::Ready(Some(chunk))) => { self.push(&chunk); Ok(Async::Ready(Some(chunk))) } Ok(Async::Ready(None)) => { self.flush(); Ok(Async::Ready(None)) } other => other, } } }

上定义一个扩展方法:


hyper::Body

可以按如下方式使用:

pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>; trait CloneBody { fn clone_body(self) -> (hyper::Body, BufferFuture); } impl CloneBody for hyper::Body { fn clone_body(self) -> (hyper::Body, BufferFuture) { let (sender, receiver) = futures::sync::oneshot::channel(); let cloning_stream = BodyClone { body: self, buffer: Some(Vec::new()), sender: Some(sender), }; ( hyper::Body::wrap_stream(cloning_stream), Box::new(receiver.map_err(|_| ())), ) } }



0
投票

let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();

使用示例:

use bytes::Bytes; use hyper::body::{Body, Frame, Incoming}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::sync::mpsc; pub struct BodyClone<T> { inner: T, tx: mpsc::Sender<Bytes>, end_stream: bool, } impl BodyClone<Incoming> { pub fn new(body: Incoming) -> (Self, mpsc::Receiver<Bytes>) { let (tx, rx) = mpsc::channel(10); ( BodyClone { inner: body, tx, end_stream: false, }, rx, ) } } impl Body for BodyClone<Incoming> { type Data = Bytes; type Error = hyper::Error; fn poll_frame( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> { match Pin::new(&mut self.inner).poll_frame(cx) { Poll::Ready(Some(Ok(frame))) => { if let Ok(data) = frame.into_data() { let _ = self.tx.try_send(data.clone()); Poll::Ready(Some(Ok(Frame::data(data)))) } else { Poll::Ready(None) } } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), Poll::Ready(None) => { self.end_stream = true; Poll::Ready(None) } Poll::Pending => Poll::Pending, } } }

	
© www.soinside.com 2019 - 2024. All rights reserved.