Rust Streams:如何将 TryStream<Ok = Vec<Item>、Error = MyErrorType>> 转换为 TryStream<Ok = Item, Error = MyErrorType>

问题描述 投票:0回答:1

我正在使用

futures-util
板条箱在 Rust 中玩弄流。

我正在创建一个流,在每个步骤中从分页 API 请求一个新页面。 每个页面的项目基本上是一个 Vector。很明显,每次调用服务器请求新页面都可能失败。

我不想返回向量流,而是返回在尝试从失败的页面中获取项目时产生错误(并因此关闭流)的项目流。

这是我当前的功能:

    pub fn search(
        &self,
        query: Query,
    ) -> impl TryStream<Ok = Vec<Item>, Error = MyError> {
        let initial_state = (query, None);
        stream::try_unfold(initial_state, |(query, page_token)| async move {
            let (items, next_page_token) = fetch_page(&query, page_token).await?;
            if items.len() <= 0 {
                Ok(None)
            } else {
                Ok(Some((items, (query, next_page_token))))
            }
        })
    }

我想修改它所以返回类型是

impl TryStream<Ok = Item, Error = MyError>

有可能吗?

asynchronous rust async-await stream
1个回答
0
投票

你可以使用

try_flatten()
,但是它要求每个内部流也是一个
TryStream
所以你需要映射每个项目:

use futures_util::stream::TryStreamExt;

pub fn search(query: Query) -> impl TryStream<Ok = Item, Error = MyError> {
    let initial_state = (query, None);
    stream::try_unfold(initial_state, |(query, page_token)| async move {
        let (items, next_page_token) = fetch_page(&query, page_token).await?;
        if items.len() <= 0 {
            Ok(None)
        } else {
            Ok(Some((items, (query, next_page_token))))
        }
    })
    .map_ok(|items| stream::iter(items.into_iter().map(|item| Ok(item))))
    .try_flatten()
}

但是请注意,返回

impl Stream<Item = Result<Item, MyError>>
比返回
impl TryStream
更好,因为
impl Stream
也会自动执行
TryStream
但反之则不然。

© www.soinside.com 2019 - 2024. All rights reserved.