美好的一天,我正在尝试使用 tokio 为流编写一个简单而通用的演员特征。 我无法在一个任务(StreamActor::run)中同时收听流和 mpsc 接收器。
我用宏 tokio::select 试了一下。这是我的尝试:
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
{
type Message: Send + Debug;
async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
info!("started");
self.initialize(&mut ctx).await?;
loop {
tokio::select! {
Some(msg) = ctx.receiver.recv() => {
self.handle_actor_message(msg, &mut ctx).await?
},
Some(msg) = ctx.stream.next() => {
self.handle_stream_message(msg, &mut ctx).await?
},
else => {
ctx.receiver.close();
break;
}
}
}
self.finalize(&mut ctx).await?;
info!("ended");
Ok(())
}
async fn handle_actor_message(
&mut self,
msg: Self::Message,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn handle_stream_message(
&mut self,
msg: S::Item,
ctx: &mut Context<Self, S>,
) -> Result<()>;
async fn initialize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
async fn finalize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
Ok(())
}
}
完整代码:Rust Playground(遗憾的是由于依赖关系无法执行)
编译器错误:
error: future cannot be sent between threads safely
--> src\main.rs:73:70
|
73 | async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
| ______________________________________________________________________^
74 | | info!("started");
75 | | self.initialize(&mut ctx).await?;
76 | |
... |
95 | | Ok(())
96 | | }
| |_____^ future created by async block is not `Send`
|
= help: within `impl futures::Future<Output = Result<(), anyhow::Error>>`, the trait `std::marker::Send` is not implemented for `<S as Stream>::Item`
note: future is not `Send` as this value is used across an await
--> src\main.rs:83:62
|
78 | / tokio::select! {
79 | | Some(msg) = ctx.receiver.recv() => {
80 | | self.handle_actor_message(msg, &mut ctx).await?
81 | | },
82 | | Some(msg) = ctx.stream.next() => {
| | --- has type `<S as Stream>::Item` which is not `Send`
83 | | self.handle_stream_message(msg, &mut ctx).await?
| | ^^^^^^ await occurs here, with `msg` maybe used later
... |
88 | | }
89 | | }
| |_____________- `msg` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<(), anyhow::Error>>` to the object type `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`
据我所知,这里的问题是流项目 (S::Item) 没有“发送”特性,但需要它,因为我需要等待异步消息处理程序,所以它可以跨线程发送。我不知道如何以及是否可以将特征的关联类型限制为“发送”。如果我将通用类型 S 替换为具体类型,例如在我的情况下“SplitStream
但是我也需要其他流的演员特征,所以我的问题是,有没有一种方法可以使通用方法起作用,如果可以的话怎么办?
只需在正确的位置添加绑定
S::Item: Send
:
#[async_trait]
trait StreamActor<S>
where
Self: Sized + Sync + Send + 'static,
S: Stream + Unpin + Send + 'static,
S::Item: Send,
{ ... }
这会导致一些级联错误,您需要在其他几个地方添加
S::Item: Send
来修复它们。