Rust 中有状态处理异步数据的模式

问题描述 投票:0回答:1

考虑以下 Rust 中的完整示例:

use async_stream::stream;
use futures::{pin_mut, StreamExt};
use std::time::Duration;
use tokio::time::sleep;

pub struct Processor {
    pub state: i32,
}

impl Processor {
    pub fn process(&mut self, x: i32) -> i32 {
        self.state += 1;
        self.state * x
    }
}

#[tokio::main]
async fn main() {
    let mut p = Processor { state: 1 };

    let stream = stream! {
        for i in 1..=10 {
            sleep(Duration::from_millis(100)).await;
            yield i
        }
    };

    let s2 = stream.filter_map(move |m| async {
        match m {
            7 => None,
            _ => Some(p.process(m)),
        }
    });
    pin_mut!(s2);

    while let Some(v) = s2.next().await {
        println!("Next value {:?}", v);
    }
}

此设计非常简单,有一个状态处理适配器,用于接收项目、更改状态并返回一些值。

但是,此示例无法编译,因为:

error: captured variable cannot escape `FnMut` closure body
  --> src/main.rs:28:41
   |
19 |       let mut p = Processor { state: 1 };
   |           ----- variable defined here
...
28 |       let s2 = stream.filter_map(move |m| async {
   |  _______________________________________-_^
   | |                                       |
   | |                                       inferred to be a `FnMut` closure
29 | |         match m {
30 | |             7 => None,
31 | |             _ => Some(p.process(m)),
   | |                       - variable captured here
32 | |         }
33 | |     });
   | |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body

问题在于,一方面,我们有一个保存状态的适配器,另一方面,程序的异步部分(在本例中是

filter_map
)需要保存对该适配器的可变引用。

实现这种异步循环的正确设计是什么?在某些情况下,可以将异步部分分开,例如首先应用同步

map
,然后将异步
filter_map
链接到它,但我的问题是针对更一般的情况,其中异步块必须 持有一些可变引用。

这种情况下的另一个逃避方法是克隆处理器,但为了讨论起见,我们假设这对性能不利。

asynchronous rust rust-tokio
1个回答
0
投票

StreamExt::filter_map()
(和类似的方法)不能支持这一点,它们不是为此设计的(问题#2464,我认为它们不能被重新设计来处理这个问题。

您可以创建自己的流组合器来做到这一点,但即使这样也不是那么简单。目前不可能(据我所知)有一个签名可以接受所有借用其参数的异步块。有关详细信息,请参阅使用(可变)借用参数调用通用异步函数。一个可能的部分解决方案是对返回的 future 进行装箱并返回

Pin<Box<dyn Future>>
(以内存分配为代价),或者使用特质魔法来实现,但这仅适用于异步函数,不适用于返回异步块的闭包(至少,并非没有奇怪的签名和不稳定的功能,例如
type_alias_impl_trait
closure_lifetime_binder
)。

© www.soinside.com 2019 - 2024. All rights reserved.