我如何创建一个流,其中这些项基于流先前返回的项?

问题描述 投票:0回答:1

我有一个基于参数生成futures::Stream的函数。我想多次调用此函数并将流放在一起。麻烦的是,我想将流返回的值作为原始函数的参数反馈回来。

具体来说,我有一个函数可以将数字流返回到零:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

我想从5开始调用此函数。还应该为返回的每个奇数调用该函数。呼叫numbers_down_to_zero的总次数为:

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);

产生的总流

4
3
2
1
0
2
1
0
0
0

存在哪些技术可以做到这一点?

asynchronous rust stream future
1个回答
1
投票

这些是我发现的部分解决方案,但由于各种原因而缺乏。

使用具有内部可变性的组合器

我不喜欢这种解决方案,因为我认为这个一般性问题不需要内部可变性,但是在这里是必需的,因为借位检查器不知道对闭包的调用将如何交织。

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{cell::RefCell, rc::Rc};

fn y0() -> impl Stream<Item = i32> {
    let to_visit = Rc::new(RefCell::new(VecDeque::from(vec![5])));
    let to_visit_b = to_visit.clone();

    stream::unfold(to_visit, |to_visit| async {
        let i = to_visit.borrow_mut().pop_back()?;

        Some((x(i), to_visit))
    })
    .flatten()
    .inspect(move |&x| {
        if x % 2 != 0 {
            to_visit_b.borrow_mut().push_front(x);
        }
    })
}

#[tokio::main]
async fn main() {
    y0().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

playground

Stream::poll_next的自定义实现

我不喜欢这种解决方案,因为它很冗长,并且需要难以理解的棘手的unsafe代码(我什至不确定我的正确性!)

use futures::{stream, Stream, StreamExt};
use std::collections::VecDeque;

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use std::{
    pin::Pin,
    task::{Context, Poll},
};

struct X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    state: St,
    create: C,
    review: R,
    current: Option<S>,
}

impl<St, C, R, S> Stream for X<St, C, R, S>
where
    C: Fn(&mut St) -> Option<S>,
    R: Fn(&mut St, &mut S::Item),
    S: Stream,
{
    type Item = S::Item;

    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let (state, create, review, current) = unsafe {
            let Self {
                state,
                create,
                review,
                current,
            } = self.get_unchecked_mut();
            (state, create, review, current)
        };

        loop {
            if let Some(current) = current {
                let v = unsafe { futures::ready!(Pin::new_unchecked(current).poll_next(ctx)) };
                if let Some(mut v) = v {
                    review(state, &mut v);
                    return Poll::Ready(Some(v));
                }
            }

            *current = create(state);
            if current.is_none() {
                return Poll::Ready(None);
            }
        }
    }
}

fn y1() -> impl Stream<Item = i32> {
    X {
        state: VecDeque::from(vec![5]),
        create: |to_visit| {
            let i = to_visit.pop_back()?;

            Some(x(i))
        },
        review: |to_visit, &mut x| {
            if x % 2 != 0 {
                to_visit.push_front(x);
            }
        },
        current: None,
    }
}

#[tokio::main]
async fn main() {
    y1().for_each(|v| async move {
        println!("v: {}", v);
    })
    .await;
}

playground


使用频道(无效)

这不起作用,因为发送方从未被丢弃,接收方也从未被丢弃,因为发送方从未被丢弃...

除了不起作用之外,还有很多缺点:

  • 状态必须隐式地是一个队列(与我想做的匹配,但不是很普遍)。
  • 它要求我的函数本身成为async才能推动初始值访问。
  • 我必须处理似乎无关的错误条件。
  • 我必须在Sender闭包内克隆then
use futures::{stream, Stream, StreamExt};

fn x(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

use futures::channel::mpsc;
use futures::sink::SinkExt;

async fn y2() -> impl Stream<Item = i32> {
    let (mut tx, rx) = mpsc::unbounded();

    tx.send(5).await.unwrap();

    rx.map(x).flatten().then(move |x| {
        let mut tx = tx.clone();
        async move {
            if x % 2 != 0 {
                tx.send(x).await.unwrap();
            }
            x
        }
    })
}

#[tokio::main]
async fn main() {
    y2().await
        .for_each(|v| async move {
            println!("v: {}", v);
        })
        .await;
}

playground


1
投票

您可以使用unfold解决此问题,方法是使用一个“状态”结构,该结构既保留“基本流”(在这种情况下,倒计数为零),又保留将产生新流的项目列表,并使用该结构作为unfold在展开时保持状态的参数。

通过这种方式,编译器不必考虑生存期的所有权,因为对于每次关闭的调用,状态都可以移到async块中。

/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
    /// Helper struct to keep state while unfolding
    struct StreamState<S> {
        inner_stream: S,
        item_queue: VecDeque<i32>,
    }

    // Build helper struct
    let state = StreamState {
        inner_stream: f(n),
        item_queue: VecDeque::new(),
    };

    // Unfold with state
    stream::unfold(state, |mut state| async move {
        loop {
            if let Some(item) = state.inner_stream.next().await {
                // Iterate inner stream, and potentially push item to queue
                if item % 2 == 1 {
                    state.item_queue.push_front(item);
                }
                break Some((item, state));
            } else if let Some(item) = state.item_queue.pop_back() {
                // If inner stream is exhausted, produce new stream from queue
                // and repeat loop
                state.inner_stream = f(item);
            } else {
                // If queue is empty, we are done
                break None;
            }
        }
    })
}

Full playground example

© www.soinside.com 2019 - 2024. All rights reserved.