我正在使用 hyper 0.12 构建代理服务。当从上游服务器接收到响应正文时,我想尽快将其转发回客户端,并且将内容保存在缓冲区中以供以后处理。
所以我需要一个函数:
Stream
(准确地说,是一个hyper::Body
)Stream
Future<Item = Vec<u8>, Error = ...>
,当输出流完全消耗时,它可以通过输入流的缓冲内容来解析我一辈子都不知道该怎么做。
我想我正在寻找的功能看起来像这样:
type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
let body2 = ... // ???
let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
buf.extend_from_slice(&chunk);
// ...somehow send this chunk to body2 also?
});
(body2, buffer);
}
以下是我尝试过的方法,它一直有效,直到
send_data()
失败(显然)。
type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
let (mut sender, body2) = hyper::Body::channel();
let consume =
body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
buf.extend_from_slice(&chunk);
// What to do if this fails?
if sender.send_data(chunk).is_err() {}
Box::new(future::ok(buf))
});
(body2, Box::new(consume));
}
然而,有些事情告诉我我走错了路。
Sink.fanout()
,这似乎就是我想要的,但我没有Sink
,而且我不知道如何构建一个。 hyper::Body
实现 Stream
但不实现 Sink
。
我最终所做的是实现一种新型的流来满足我的需要。这似乎是必要的,因为
hyper::Body
不实现 Sink
也不实现 hyper::Chunk
(这是 Clone
所必需的),所以我不能使用任何现有的组合器。首先是一个结构体,其中包含我们需要的所有详细信息和附加新块的方法,以及通知缓冲区已完成。
Sink.fanout()
然后我为这个结构实现了
struct BodyClone<T> {
body: T,
buffer: Option<Vec<u8>>,
sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>,
}
impl BodyClone<hyper::Body> {
fn flush(&mut self) {
if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) {
if sender.send(buffer).is_err() {}
}
}
fn push(&mut self, chunk: &hyper::Chunk) {
use hyper::body::Payload;
let length = if let Some(buffer) = self.buffer.as_mut() {
buffer.extend_from_slice(chunk);
buffer.len() as u64
} else {
0
};
if let Some(content_length) = self.body.content_length() {
if length >= content_length {
self.flush();
}
}
}
}
特征。
Stream
最后我可以在
impl Stream for BodyClone<hyper::Body> {
type Item = hyper::Chunk;
type Error = hyper::Error;
fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
match self.body.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.push(&chunk);
Ok(Async::Ready(Some(chunk)))
}
Ok(Async::Ready(None)) => {
self.flush();
Ok(Async::Ready(None))
}
other => other,
}
}
}
上定义一个扩展方法:
hyper::Body
可以按如下方式使用:
pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>;
trait CloneBody {
fn clone_body(self) -> (hyper::Body, BufferFuture);
}
impl CloneBody for hyper::Body {
fn clone_body(self) -> (hyper::Body, BufferFuture) {
let (sender, receiver) = futures::sync::oneshot::channel();
let cloning_stream = BodyClone {
body: self,
buffer: Some(Vec::new()),
sender: Some(sender),
};
(
hyper::Body::wrap_stream(cloning_stream),
Box::new(receiver.map_err(|_| ())),
)
}
}
let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();
使用示例:
use bytes::Bytes;
use hyper::body::{Body, Frame, Incoming};
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
pub struct BodyClone<T> {
inner: T,
tx: mpsc::Sender<Bytes>,
end_stream: bool,
}
impl BodyClone<Incoming> {
pub fn new(body: Incoming) -> (Self, mpsc::Receiver<Bytes>) {
let (tx, rx) = mpsc::channel(10);
(
BodyClone {
inner: body,
tx,
end_stream: false,
},
rx,
)
}
}
impl Body for BodyClone<Incoming> {
type Data = Bytes;
type Error = hyper::Error;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match Pin::new(&mut self.inner).poll_frame(cx) {
Poll::Ready(Some(Ok(frame))) => {
if let Ok(data) = frame.into_data() {
let _ = self.tx.try_send(data.clone());
Poll::Ready(Some(Ok(Frame::data(data))))
} else {
Poll::Ready(None)
}
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
self.end_stream = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}