我有以下方法:
pub fn load_names(&self, req: &super::MagicQueryType) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver<String>> {
我的目标是获得grpcio::ClientSStreamReceiver
的第一个元素;我不关心其他名字:
let name: String = load_names(query)?.wait().nth(0)?;
在wait()
之前调用nth(0)
似乎效率低下,因为我相信wait()
阻止了直到收到所有元素的流。
如何在不触发构建错误的情况下编写更有效的解决方案(即nth(0).wait()
)? Rust的futures::stream::Stream
构建错误对我来说非常混乱。
Rust playground不支持grpcio = "0.4.4"
所以我无法提供链接。
要以阻塞方式提取futures::Stream
的第一个元素,您应该通过调用Stream
将Stream::wait
转换为迭代器,然后调用Iterator::next
。阅读Stream::wait
的文档表明它基本上是最有效的实现:
如果流中的项尚未准备就绪,则此迭代器将在每次调用
next
时阻塞当前线程。
use futures::{stream, Stream}; // 0.1.25
use std::iter;
fn example() -> impl Stream<Item = i32, Error = ()> {
stream::iter_ok(iter::repeat(42))
}
fn main() {
let v = example().wait().next();
println!("{:?}", v);
}
如果您使用的是Tokio,您可以将Stream
转换为Future
并调用Runtime::block_on
:
use std::iter;
use tokio::prelude::*; // 0.1.15
fn example() -> impl Stream<Item = i32, Error = ()> {
stream::iter_ok(iter::repeat(42))
}
fn main() {
let mut runtime = tokio::runtime::Runtime::new().expect("Unable to create a runtime");
let r = runtime.block_on(example().into_future());
if let Ok((v, _)) = r {
println!("{:?}", v);
}
}
也可以看看: