我有一个通过期货发送的物品清单:: Sink
:
let mut list = VecDeque::new();
/* add a bunch of Packet items to list */
let (sink, stream) = tcp_stream.framed(PacketCodec).split();
我可以使用发送一个数据包
if let Some(first) = list.pop_front() {
sink.send(first);
}
如何发送整个列表?
当使用新的软件时,我觉得在深入潜水之前阅读文档非常有用。一般来说,Rust社区提供了非常好的资源。
例如,Tokio项目有一个entire site of documentation,其中包括selection of working examples。 generated API documentation for Sink
在这里也很有价值。
我建议所有程序员学会做的另一件事是创建问题的MCVE。这使他们能够专注于问题的核心,同时也消除了问题背后的瑕疵和磨练。这是一个案例:
use futures; // 0.1.26
fn thing<S>(sink: S)
where
S: futures::Sink<SinkItem = i32>,
{
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
sink.send(v);
}
}
我们收到错误消息
error[E0382]: use of moved value: `sink`
--> src/lib.rs:10:9
|
3 | fn thing<S>(sink: S)
| - ---- move occurs because `sink` has type `S`, which does not implement the `Copy` trait
| |
| consider adding a `Copy` constraint to this type argument
...
10 | sink.send(v);
| ^^^^ value moved here, in previous iteration of loop
让我们回到API文档......
fn send(self, item: Self::SinkItem) -> Send<Self>
where
Self: Sized,
由此,我们可以看到send
消耗了接收器并返回Send
值。那很奇怪,不是吗?实际上并没有那么奇怪,一旦你看到Send
实现Future
- 如果流已满,可能会阻止某些内容进入流中;这是背压的概念。将结果设为Future
就是如何知道项目何时实际添加到流中。 Future
为Send
的实施有Item = S
,Sink
的具体类型。因此,一种解决方案是将未来推向完成以获得回收,其中一种方法是调用wait
:
use futures::Future; // 0.1.26
use std::fmt;
fn thing<S>(mut sink: S)
where
S: futures::Sink<SinkItem = i32>,
S::SinkError: fmt::Debug,
{
let mut all_the_things = vec![1, 2, 3, 4, 5];
while let Some(v) = all_the_things.pop() {
let f = sink.send(v);
sink = f.wait().expect("Something bad happened while sending");
}
}
这编译,但不是非常漂亮。回到文档,我们还可以看到Sink::send_all
,它需要一个Stream
。我们可以使用Stream
从迭代器创建一个stream::iter_ok
:
use futures::Future; // 0.1.26
use std::fmt;
fn thing<S>(sink: S)
where
S: futures::Sink<SinkItem = i32>,
S::SinkError: fmt::Debug,
{
let all_the_things = vec![1, 2, 3, 4, 5];
sink.send_all(futures::stream::iter_ok(all_the_things))
.wait()
.expect("Something bad happened while sending");
}
再次,我们使用wait
来推动未来的完成。如果我们忘记这是一个很好的错误:“除非进行调查,否则期货什么都不做”。然而,wait
并不是处理这个问题的正确方法。通常,您希望将Future
返回给调用者,因为他们想知道何时将所有内容都塞进了接收器。