考虑以下 Rust 中的完整示例:
use async_stream::stream;
use futures::{pin_mut, StreamExt};
use std::time::Duration;
use tokio::time::sleep;
pub struct Processor {
pub state: i32,
}
impl Processor {
pub fn process(&mut self, x: i32) -> i32 {
self.state += 1;
self.state * x
}
}
#[tokio::main]
async fn main() {
let mut p = Processor { state: 1 };
let stream = stream! {
for i in 1..=10 {
sleep(Duration::from_millis(100)).await;
yield i
}
};
let s2 = stream.filter_map(move |m| async {
match m {
7 => None,
_ => Some(p.process(m)),
}
});
pin_mut!(s2);
while let Some(v) = s2.next().await {
println!("Next value {:?}", v);
}
}
此设计非常简单,有一个状态处理适配器,用于接收项目、更改状态并返回一些值。
但是,此示例无法编译,因为:
error: captured variable cannot escape `FnMut` closure body
--> src/main.rs:28:41
|
19 | let mut p = Processor { state: 1 };
| ----- variable defined here
...
28 | let s2 = stream.filter_map(move |m| async {
| _______________________________________-_^
| | |
| | inferred to be a `FnMut` closure
29 | | match m {
30 | | 7 => None,
31 | | _ => Some(p.process(m)),
| | - variable captured here
32 | | }
33 | | });
| |_____^ returns an `async` block that contains a reference to a captured variable, which then escapes the closure body
问题在于,一方面,我们有一个保存状态的适配器,另一方面,程序的异步部分(在本例中是
filter_map
)需要保存对该适配器的可变引用。
实现这种异步循环的正确设计是什么?在某些情况下,可以将异步部分分开,例如首先应用同步
map
,然后将异步 filter_map
链接到它,但我的问题是针对更一般的情况,其中异步块必须 持有一些可变引用。
这种情况下的另一个逃避方法是克隆处理器,但为了讨论起见,我们假设这对性能不利。
StreamExt::filter_map()
(和类似的方法)不能支持这一点,它们不是为此设计的(问题#2464,我认为它们不能被重新设计来处理这个问题。
您可以创建自己的流组合器来做到这一点,但即使这样也不是那么简单。目前不可能(据我所知)有一个签名可以接受所有借用其参数的异步块。有关详细信息,请参阅使用(可变)借用参数调用通用异步函数。一个可能的部分解决方案是对返回的 future 进行装箱并返回
Pin<Box<dyn Future>>
(以内存分配为代价),或者使用特质魔法来实现,但这仅适用于异步函数,不适用于返回异步块的闭包(至少,并非没有奇怪的签名和不稳定的功能,例如type_alias_impl_trait
和closure_lifetime_binder
)。