带有 tokio 的简单通用流演员

问题描述 投票:0回答:1

美好的一天,我正在尝试使用 tokio 为流编写一个简单而通用的演员特征。 我无法在一个任务(StreamActor::run)中同时收听流和 mpsc 接收器。

我用宏 tokio::select 试了一下。这是我的尝试:

#[async_trait]
trait StreamActor<S>
where
    Self: Sized + Sync + Send + 'static,
    S: Stream + Unpin + Send + 'static,
{
    type Message: Send + Debug;

    async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
        info!("started");
        self.initialize(&mut ctx).await?;

        loop {
            tokio::select! {
                Some(msg) = ctx.receiver.recv() => {
                    self.handle_actor_message(msg, &mut ctx).await?
                },
                Some(msg) = ctx.stream.next() => {
                    self.handle_stream_message(msg, &mut ctx).await?
                },
                else => {
                    ctx.receiver.close();
                    break;
                }
            }
        }

        self.finalize(&mut ctx).await?;
        info!("ended");

        Ok(())
    }

    async fn handle_actor_message(
        &mut self,
        msg: Self::Message,
        ctx: &mut Context<Self, S>,
    ) -> Result<()>;

    async fn handle_stream_message(
        &mut self,
        msg: S::Item,
        ctx: &mut Context<Self, S>,
    ) -> Result<()>;

    async fn initialize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
        Ok(())
    }

    async fn finalize(&mut self, _: &mut Context<Self, S>) -> Result<()> {
        Ok(())
    }
}

完整代码:Rust Playground(遗憾的是由于依赖关系无法执行)

编译器错误:

error: future cannot be sent between threads safely
  --> src\main.rs:73:70
   |
73 |       async fn run(&mut self, mut ctx: Context<Self, S>) -> Result<()> {
   |  ______________________________________________________________________^
74 | |         info!("started");
75 | |         self.initialize(&mut ctx).await?;
76 | |
...  |
95 | |         Ok(())
96 | |     }
   | |_____^ future created by async block is not `Send`
   |
   = help: within `impl futures::Future<Output = Result<(), anyhow::Error>>`, the trait `std::marker::Send` is not implemented for `<S as Stream>::Item`
note: future is not `Send` as this value is used across an await
  --> src\main.rs:83:62
   |
78 | /             tokio::select! {
79 | |                 Some(msg) = ctx.receiver.recv() => {
80 | |                     self.handle_actor_message(msg, &mut ctx).await?
81 | |                 },
82 | |                 Some(msg) = ctx.stream.next() => {
   | |                      --- has type `<S as Stream>::Item` which is not `Send`
83 | |                     self.handle_stream_message(msg, &mut ctx).await?
   | |                                                              ^^^^^^ await occurs here, with `msg` maybe used later
...  |
88 | |                 }
89 | |             }
   | |_____________- `msg` is later dropped here
   = note: required for the cast from `impl futures::Future<Output = Result<(), anyhow::Error>>` to the object type `dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send`

据我所知,这里的问题是流项目 (S::Item) 没有“发送”特性,但需要它,因为我需要等待异步消息处理程序,所以它可以跨线程发送。我不知道如何以及是否可以将特征的关联类型限制为“发送”。如果我将通用类型 S 替换为具体类型,例如在我的情况下“SplitStream”一切正常。

但是我也需要其他流的演员特征,所以我的问题是,有没有一种方法可以使通用方法起作用,如果可以的话怎么办?

asynchronous rust stream actor rust-tokio
1个回答
0
投票

只需在正确的位置添加绑定

S::Item: Send

#[async_trait]
trait StreamActor<S>
where
    Self: Sized + Sync + Send + 'static,
    S: Stream + Unpin + Send + 'static,
    S::Item: Send,
{ ... }

这会导致一些级联错误,您需要在其他几个地方添加

S::Item: Send
来修复它们。

© www.soinside.com 2019 - 2024. All rights reserved.