我有一个基于参数生成futures::Stream
的函数。我想多次调用此函数并将流放在一起。麻烦的是,我想将流返回的值作为原始函数的参数反馈回来。
具体来说,我有一个函数可以将数字流返回到零:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
我想从5开始调用此函数。还应该为返回的每个奇数调用该函数。呼叫numbers_down_to_zero
的总次数为:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
产生的总流
4
3
2
1
0
2
1
0
0
0
存在哪些技术可以做到这一点?
这些是我发现的部分解决方案,但由于各种原因而缺乏。
我不喜欢这种解决方案,因为我认为这个一般性问题不需要内部可变性,但是在这里是必需的,因为借位检查器不知道对闭包的调用将如何交织。
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{cell::RefCell, rc::Rc};
fn y0() -> impl Stream<Item = i32> {
let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
let to_visit_b = to_visit.clone();
stream::unfold(to_visit, |to_visit| async {
let i = to_visit.borrow_mut().pop_back()?;
Some((x(i), to_visit))
})
.flatten()
.inspect(move |&x| {
if x % 2 != 0 {
to_visit_b.borrow_mut().push_front(x);
}
})
}
#[tokio::main]
async fn main() {
y0().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
Stream::poll_next
的自定义实现我不喜欢这种解决方案,因为它很冗长,并且需要难以理解的棘手的unsafe
代码(我什至不确定我的正确性!)
use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use std::{
pin::Pin,
task::{Context, Poll},
};
struct X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
state: St,
create: C,
review: R,
current: Option<S>,
}
impl<St, C, R, S> Stream for X<St, C, R, S>
where
C: Fn(&mut St) -> Option<S>,
R: Fn(&mut St, &mut S::Item),
S: Stream,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (state, create, review, current) = unsafe {
let Self {
state,
create,
review,
current,
} = self.get_unchecked_mut();
(state, create, review, current)
};
loop {
if let Some(current) = current {
let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
if let Some(mut v) = v {
review(state, &mut v);
return Poll::Ready(Some(v));
}
}
*current = create(state);
if current.is_none() {
return Poll::Ready(None);
}
}
}
}
fn y1() -> impl Stream<Item = i32> {
X {
state: VecDeque::from(vec![5]),
create: |to_visit| {
let i = to_visit.pop_back()?;
Some(x(i))
},
review: |to_visit, &mut x| {
if x % 2 != 0 {
to_visit.push_front(x);
}
},
current: None,
}
}
#[tokio::main]
async fn main() {
y1().for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
这不起作用,因为发送方从未被丢弃,接收方也从未被丢弃,因为发送方从未被丢弃...
除了不起作用之外,还有很多缺点:
async
才能推动初始值访问。 Sender
闭包内克隆then
。use futures::{stream, Stream, StreamExt};
fn x(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
use futures::channel::mpsc;
use futures::sink::SinkExt;
async fn y2() -> impl Stream<Item = i32> {
let (mut tx, rx) = mpsc::unbounded();
tx.send(5).await.unwrap();
rx.map(x).flatten().then(move |x| {
let mut tx = tx.clone();
async move {
if x % 2 != 0 {
tx.send(x).await.unwrap();
}
x
}
})
}
#[tokio::main]
async fn main() {
y2().await
.for_each(|v| async move {
println!("v: {}", v);
})
.await;
}
您可以使用unfold
解决此问题,方法是使用一个“状态”结构,该结构既保留“基本流”(在这种情况下,倒计数为零),又保留将产生新流的项目列表,并使用该结构作为unfold
在展开时保持状态的参数。
通过这种方式,编译器不必考虑生存期的所有权,因为对于每次关闭的调用,状态都可以移到async
块中。
/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
/// Helper struct to keep state while unfolding
struct StreamState<S> {
inner_stream: S,
item_queue: VecDeque<i32>,
}
// Build helper struct
let state = StreamState {
inner_stream: f(n),
item_queue: VecDeque::new(),
};
// Unfold with state
stream::unfold(state, |mut state| async move {
loop {
if let Some(item) = state.inner_stream.next().await {
// Iterate inner stream, and potentially push item to queue
if item % 2 == 1 {
state.item_queue.push_front(item);
}
break Some((item, state));
} else if let Some(item) = state.item_queue.pop_back() {
// If inner stream is exhausted, produce new stream from queue
// and repeat loop
state.inner_stream = f(item);
} else {
// If queue is empty, we are done
break None;
}
}
})
}