如何轮询 Pin 图<Option<Box<dyn Future>>?

问题描述 投票:0回答:1

当在由异步函数递归生成的

Option<Box<dyn Future>
中实现流时。如何投票
Pin<Option<Box<dyn Future>>
?谢谢。

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::Stream;

#[pin_project::pin_project]
struct BoxedStream<T, F> {
    #[pin]
    next: Option<String>,
    #[pin]
    inner: Option<Box<dyn Future<Output = T>>>,
    //             ^--- Future generated by async function, it is an opaque type.
    generate: F,
}

impl<T, F> Stream for BoxedStream<T, F>
where
    F: Fn(String) -> (Option<String>, Box<dyn Future<Output = T>>),
{
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut p = self.as_mut().project();
        if let Some(s) = p.inner.as_mut().as_pin_mut() {
            return match todo!("how to do it here? s.poll(cx)?") {
                result @ Poll::Ready(_) => {
                    p.inner.take();
                    result
                }
                _ => Poll::Pending,
            };
        }

        if let Some(next) = p.next.take() {
            let (next, future) = (p.generate)(next);
            p.inner.set(Some(future));
            p.next.set(next);
            return self.poll_next(cx);
        }

        Poll::Ready(None) // end
    }
}

当我用

s.poll(cx)
代替待办事项时报告错误:

error[E0599]: the method `poll` exists for struct `Pin<&mut Box<dyn Future<Output = T>>>`, but its trait bounds were not satisfied
  --> src/lib.rs:28:28
   |
28 |               return match s.poll(cx)? {
   |                              ^^^^ method cannot be called on `Pin<&mut Box<dyn Future<Output = T>>>` due to unsatisfied trait bounds
  --> /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/core/src/future/future.rs:37:1
   |
   = note: doesn't satisfy `_: Unpin`
  --> /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/alloc/src/boxed.rs:195:1
  ::: /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/alloc/src/boxed.rs:198:1
   |
   = note: doesn't satisfy `_: Future`
   |
   = note: the following trait bounds were not satisfied:
           `(dyn futures_util::Future<Output = T> + 'static): Unpin`
           which is required by `Box<(dyn futures_util::Future<Output = T> + 'static)>: futures_util::Future`
asynchronous rust
1个回答
0
投票

here所述,如果内部对象也是

Box<dyn Future>
,则
Future
本身只是一个
Unpin

因此,通过将

+ Unpin
添加到您的类型中,它可以编译:

use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::Stream;

#[pin_project::pin_project]
struct BoxedStream<T, F> {
    #[pin]
    next: Option<String>,
    #[pin]
    inner: Option<Box<dyn Future<Output = T> + Unpin>>,
    //             ^--- Future generated by async function, it is an opaque type.
    generate: F,
}

impl<T, F> Stream for BoxedStream<T, F>
where
    F: Fn(String) -> (Option<String>, Box<dyn Future<Output = T> + Unpin>),
{
    type Item = T;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut p = self.as_mut().project();
        if let Some(s) = p.inner.as_mut().as_pin_mut() {
            return match s.poll(cx) {
                Poll::Ready(result) => {
                    p.inner.take();
                    Poll::Ready(Some(result))
                }
                _ => Poll::Pending,
            };
        }

        if let Some(next) = p.next.take() {
            let (next, future) = (p.generate)(next);
            p.inner.set(Some(future));
            p.next.set(next);
            return self.poll_next(cx);
        }

        Poll::Ready(None) // end
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.