如何以阻塞方式有效地提取期货::流的第一个元素?

问题描述 投票:-1回答:1

我有以下方法:

pub fn load_names(&self, req: &super::MagicQueryType) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver<String>> {

我的目标是获得grpcio::ClientSStreamReceiver的第一个元素;我不关心其他名字:

let name: String = load_names(query)?.wait().nth(0)?;

wait()之前调用nth(0)似乎效率低下,因为我相信wait()阻止了直到收到所有元素的流。

如何在不触发构建错误的情况下编写更有效的解决方案(即nth(0).wait())? Rust的futures::stream::Stream构建错误对我来说非常混乱。

Rust playground不支持grpcio = "0.4.4"所以我无法提供链接。

stream rust
1个回答
2
投票

要以阻塞方式提取futures::Stream的第一个元素,您应该通过调用StreamStream::wait转换为迭代器,然后调用Iterator::next。阅读Stream::wait的文档表明它基本上是最有效的实现:

如果流中的项尚未准备就绪,则此迭代器将在每次调用next时阻塞当前线程。

use futures::{stream, Stream}; // 0.1.25
use std::iter;

fn example() -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(iter::repeat(42))
}

fn main() {
    let v = example().wait().next();
    println!("{:?}", v);
}

如果您使用的是Tokio,您可以将Stream转换为Future并调用Runtime::block_on

use std::iter;
use tokio::prelude::*; // 0.1.15

fn example() -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(iter::repeat(42))
}

fn main() {
    let mut runtime = tokio::runtime::Runtime::new().expect("Unable to create a runtime");
    let r = runtime.block_on(example().into_future());
    if let Ok((v, _)) = r {
        println!("{:?}", v);
    }
}

也可以看看:

© www.soinside.com 2019 - 2024. All rights reserved.