我正在使用
futures-util
板条箱在 Rust 中玩弄流。
我正在创建一个流,在每个步骤中从分页 API 请求一个新页面。 每个页面的项目基本上是一个 Vector。很明显,每次调用服务器请求新页面都可能失败。
我不想返回向量流,而是返回在尝试从失败的页面中获取项目时产生错误(并因此关闭流)的项目流。
这是我当前的功能:
pub fn search(
&self,
query: Query,
) -> impl TryStream<Ok = Vec<Item>, Error = MyError> {
let initial_state = (query, None);
stream::try_unfold(initial_state, |(query, page_token)| async move {
let (items, next_page_token) = fetch_page(&query, page_token).await?;
if items.len() <= 0 {
Ok(None)
} else {
Ok(Some((items, (query, next_page_token))))
}
})
}
我想修改它所以返回类型是
impl TryStream<Ok = Item, Error = MyError>
有可能吗?
try_flatten()
,但是它要求每个内部流也是一个TryStream
所以你需要映射每个项目:
use futures_util::stream::TryStreamExt;
pub fn search(query: Query) -> impl TryStream<Ok = Item, Error = MyError> {
let initial_state = (query, None);
stream::try_unfold(initial_state, |(query, page_token)| async move {
let (items, next_page_token) = fetch_page(&query, page_token).await?;
if items.len() <= 0 {
Ok(None)
} else {
Ok(Some((items, (query, next_page_token))))
}
})
.map_ok(|items| stream::iter(items.into_iter().map(|item| Ok(item))))
.try_flatten()
}
但是请注意,返回
impl Stream<Item = Result<Item, MyError>>
比返回 impl TryStream
更好,因为 impl Stream
也会自动执行 TryStream
但反之则不然。