future 无法在 Rust 中安全地在线程之间发送

问题描述 投票:0回答:1

我正在使用 Rocket + Capnproto,我收到来自 Web API(Rocket)的请求,并且必须通过 RPC(Capnproto)将其发送到另一台服务器

这是我的主要火箭设置

#[rocket::main]
async fn main() {   
    rocket::build()
    .mount("/API",routes![invoke])    
    .launch()
    .await.ok();
}

这是我的路线方法

#[post("/API", data = "<request>")]
async fn invoke(request: api_request::APIRequest<'_>)Result<Json<api_response::ApiResponse>, Json<api_response::ApiResponseError>>{

let result = capnp_rpc::client::run_client(String::from("Hello World")).await;
}

这里是我的 Capnproto 代码,这几乎是他们拥有的 hello world 示例,它单独工作正常,但一旦我将它放在我的 Rocket 项目上就会失败。正如你所看到的,所有内容都包含在 Tokio 库中。

pub async fn run_client( message: String ) ->Result<String, Box<dyn std::error::Error>> {
    let server_addr : String = "127.0.0.1:4000".to_string();
    let addr = server_addr
        .to_socket_addrs().unwrap()
        .next()
        .expect("could not parse address");       
        
        rocket::tokio::task::LocalSet::new()
        .run_until( async move {
            let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
            stream.set_nodelay(true).unwrap();
            let (reader, writer) =
                tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
            let rpc_network = Box::new(twoparty::VatNetwork::new(
                futures::io::BufReader::new(reader),
                futures::io::BufWriter::new(writer),
                rpc_twoparty_capnp::Side::Client,
                Default::default(),
            ));
            let mut rpc_system = RpcSystem::new(rpc_network, None);
            let hello_world: hello_world::Client =
                rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);

            rocket::tokio::task::spawn_local(rpc_system);           
           
            let mut request = hello_world.say_hello_request();
            request.get().init_request().set_name(&message[..]);

            let reply = request.send().promise.await?;           
            let reply_message  = reply.get()?.get_reply()?.get_message()?.to_str()?;
            println!("received: {}", reply_message);
            Ok(reply_message.to_string())
        }).await

}

这是我收到的完整错误

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> src\infrastructure\capnp_rpc\client.rs:284:16
    |
257 |          rocket::tokio::task::LocalSet::new()
    |          ------------------------------------ has type `LocalSet` which is not `Send`
...
284 |             }).await.unwrap()
    |                ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `*const ()`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> src\infrastructure\capnp_rpc\client.rs:284:16
    |
257 |          rocket::tokio::task::LocalSet::new()
    |          ------------------------------------ has type `LocalSet` which is not `Send`
...
284 |             }).await.unwrap()
    |                ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn StdError`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as it awaits another future which is not `Send`
   --> src\infrastructure\capnp_rpc\client.rs:257:10
    |
257 | /          rocket::tokio::task::LocalSet::new()
258 | |             .spawn_local( async move {
259 | |                 let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
260 | |                 stream.set_nodelay(true).unwrap();
...   |
283 | |                 Ok(reply_message.to_string())
284 | |             }).await.unwrap()
    | |______________^ await occurs here on type `tokio::task::JoinHandle<Result<std::string::String, Box<dyn StdError>>>`, which is not `Send`
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

根据我的调查,Rocket 有自己的威胁工作者,并且该代码似乎正在尝试启动另一个威胁工作者。我在尝试使用互斥体的另一个问题中尝试了一些方法,但无法使其工作。

不知道如何包装此代码,以便它可以在 Rocket 主要威胁工作人员下工作。

rust rpc rust-tokio rust-rocket capnproto
1个回答
0
投票

在非本地任务中运行本地任务(必须在同一线程上运行的任务)基本上是不可能的,因为所有者任务可能会在线程之间移动并随之移动本地任务。

由于所有

rocket
请求都是非本地任务,这意味着它们无法运行 capnproto 的代码。剩下的唯一选择就是主线任务。它将需要管理事件循环,在运行应用程序的同时生成 capnproto 任务。其他任务将通过通道与其进行通信。

这是骨架:

use std::future::Future;
use std::pin::Pin;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::LocalSet;

type Job = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()>>>>;

async fn send_job<Fut>(tx: &mpsc::Sender<Job>, job: impl FnOnce() -> Fut + 'static) -> Fut::Output
where
    Fut: Future + 'static,
    Fut::Output: 'static,
{
    let (result_tx, result_rx) = oneshot::channel();
    tx.send(Box::new(move || {
        let fut = job();
        Box::pin(async move {
            let result = fut.await;
            result_tx.send(result).unwrap_or_else(|_| panic!());
        })
    }))
    .await
    .unwrap();
    result_rx.await.unwrap()
}

#[tokio::main]
async fn main() {
    let local_set = LocalSet::new();
    local_set
        .run_until(async {
            // Choose a reasonable number here. If we are at the limit,
            // that means requests are coming too fast and we need to throttle them.
            let (tx, mut rx) = mpsc::channel::<Job>(100);

            local_set.spawn_local(async move {
                loop {
                    let job = rx.recv().await.expect("no sender?");
                    tokio::task::spawn_local(job());
                }
            });

            main_work(tx).await;
        })
        .await;
}

async fn main_work(tx: mpsc::Sender<Job>) {}

main_work()
内启动
rocket
tx
需要可用于所有请求,请使用
State

然后,当某个请求需要运行 capnproto 任务时,它会调用:

let result = send_job(&tx, move || async move {
    // Code here.
})
.await;

这种设计仅使用一个核心来执行 capnproto 任务,从而限制了可扩展性,但这直接来自于

capnp-rpc
板条箱的限制。

© www.soinside.com 2019 - 2024. All rights reserved.