我正在使用 Rocket + Capnproto,我收到来自 Web API(Rocket)的请求,并且必须通过 RPC(Capnproto)将其发送到另一台服务器
这是我的主要火箭设置
#[rocket::main]
async fn main() {
rocket::build()
.mount("/API",routes![invoke])
.launch()
.await.ok();
}
这是我的路线方法
#[post("/API", data = "<request>")]
async fn invoke(request: api_request::APIRequest<'_>)Result<Json<api_response::ApiResponse>, Json<api_response::ApiResponseError>>{
let result = capnp_rpc::client::run_client(String::from("Hello World")).await;
}
这里是我的 Capnproto 代码,这几乎是他们拥有的 hello world 示例,它单独工作正常,但一旦我将它放在我的 Rocket 项目上就会失败。正如你所看到的,所有内容都包含在 Tokio 库中。
pub async fn run_client( message: String ) ->Result<String, Box<dyn std::error::Error>> {
let server_addr : String = "127.0.0.1:4000".to_string();
let addr = server_addr
.to_socket_addrs().unwrap()
.next()
.expect("could not parse address");
rocket::tokio::task::LocalSet::new()
.run_until( async move {
let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true).unwrap();
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let hello_world: hello_world::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
rocket::tokio::task::spawn_local(rpc_system);
let mut request = hello_world.say_hello_request();
request.get().init_request().set_name(&message[..]);
let reply = request.send().promise.await?;
let reply_message = reply.get()?.get_reply()?.get_message()?.to_str()?;
println!("received: {}", reply_message);
Ok(reply_message.to_string())
}).await
}
这是我收到的完整错误
error: future cannot be sent between threads safely
--> src/main.rs:92:1
|
92 | #[post("/API", data = "<request>")]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
--> src\infrastructure\capnp_rpc\client.rs:284:16
|
257 | rocket::tokio::task::LocalSet::new()
| ------------------------------------ has type `LocalSet` which is not `Send`
...
284 | }).await.unwrap()
| ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
= note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
= note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)
error: future cannot be sent between threads safely
--> src/main.rs:92:1
|
92 | #[post("/API", data = "<request>")]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `*const ()`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
--> src\infrastructure\capnp_rpc\client.rs:284:16
|
257 | rocket::tokio::task::LocalSet::new()
| ------------------------------------ has type `LocalSet` which is not `Send`
...
284 | }).await.unwrap()
| ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
= note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
= note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)
error: future cannot be sent between threads safely
--> src/main.rs:92:1
|
92 | #[post("/API", data = "<request>")]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn StdError`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as it awaits another future which is not `Send`
--> src\infrastructure\capnp_rpc\client.rs:257:10
|
257 | / rocket::tokio::task::LocalSet::new()
258 | | .spawn_local( async move {
259 | | let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
260 | | stream.set_nodelay(true).unwrap();
... |
283 | | Ok(reply_message.to_string())
284 | | }).await.unwrap()
| |______________^ await occurs here on type `tokio::task::JoinHandle<Result<std::string::String, Box<dyn StdError>>>`, which is not `Send`
= note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
= note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)
根据我的调查,Rocket 有自己的威胁工作者,并且该代码似乎正在尝试启动另一个威胁工作者。我在尝试使用互斥体的另一个问题中尝试了一些方法,但无法使其工作。
不知道如何包装此代码,以便它可以在 Rocket 主要威胁工作人员下工作。
在非本地任务中运行本地任务(必须在同一线程上运行的任务)基本上是不可能的,因为所有者任务可能会在线程之间移动并随之移动本地任务。
由于所有
rocket
请求都是非本地任务,这意味着它们无法运行 capnproto 的代码。剩下的唯一选择就是主线任务。它将需要管理事件循环,在运行应用程序的同时生成 capnproto 任务。其他任务将通过通道与其进行通信。
这是骨架:
use std::future::Future;
use std::pin::Pin;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::LocalSet;
type Job = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()>>>>;
async fn send_job<Fut>(tx: &mpsc::Sender<Job>, job: impl FnOnce() -> Fut + 'static) -> Fut::Output
where
Fut: Future + 'static,
Fut::Output: 'static,
{
let (result_tx, result_rx) = oneshot::channel();
tx.send(Box::new(move || {
let fut = job();
Box::pin(async move {
let result = fut.await;
result_tx.send(result).unwrap_or_else(|_| panic!());
})
}))
.await
.unwrap();
result_rx.await.unwrap()
}
#[tokio::main]
async fn main() {
let local_set = LocalSet::new();
local_set
.run_until(async {
// Choose a reasonable number here. If we are at the limit,
// that means requests are coming too fast and we need to throttle them.
let (tx, mut rx) = mpsc::channel::<Job>(100);
local_set.spawn_local(async move {
loop {
let job = rx.recv().await.expect("no sender?");
tokio::task::spawn_local(job());
}
});
main_work(tx).await;
})
.await;
}
async fn main_work(tx: mpsc::Sender<Job>) {}
在
main_work()
内启动 rocket
。 tx
需要可用于所有请求,请使用 State
。
然后,当某个请求需要运行 capnproto 任务时,它会调用:
let result = send_job(&tx, move || async move {
// Code here.
})
.await;
这种设计仅使用一个核心来执行 capnproto 任务,从而限制了可扩展性,但这直接来自于
capnp-rpc
板条箱的限制。