在带有 boost asio 的 C++ 中,您可以通过调用
io_context.run()
(阻塞)或 io_context.poll()
(非阻塞)在异步上下文上取得进展。在 Rust 中,await
的行为与 asio run
类似。我想了解 asio poll
的 Rust 版本是什么。
假设我有一些永远循环的异步函数。以下代码按预期工作:
use tokio::time::Duration;
async fn some_async_function() {
loop {
println!("Before await");
tokio::time::sleep(Duration::from_secs(2)).await;
println!("After await");
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let local = tokio::task::LocalSet::new();
local.spawn_local(some_async_function());
local.spawn_local(some_async_function());
local.await;
}
我希望能够通过以下方式重写主函数:
#[tokio::main(flavor = "current_thread")]
async fn main() {
let local = tokio::task::LocalSet::new();
local.spawn_local(some_async_function());
local.spawn_local(some_async_function());
loop {
local.poll(); // make progress on async tasks without blocking
// otherwise, execute some logic like processing a synchronous queue of messages
}
}
延迟很重要。我希望代码保持单线程。
我不确定我是否正确使用了
LocalSet
,所以我很乐意在必要时更改该部分。
如果:
async
将消息推送到队列的任务,tokio::select
的biased模式来实现你想要的,将队列处理放在第一位:
use tokio::sync::mpsc;
async fn push_events (id: i32, tx: mpsc::Sender<(i32, i32)>) {
for i in 0..10 {
println!("Task #{id} sending {i}");
tx.send ((id, i)).await.unwrap();
tokio::task::yield_now().await;
}
}
#[tokio::main (flavor = "current_thread")]
async fn main() {
let (tx, mut rx) = mpsc::channel (1);
let task0 = push_events (0, { let tx = tx.clone(); tx });
let task1 = push_events (1, { let tx = tx.clone(); tx });
tokio::pin!(task0);
tokio::pin!(task1);
loop {
tokio::select!{
biased;
Some ((id, i)) = rx.recv() => {
println!("Received {i} from task #{id}");
},
_ = &mut task0 => {},
_ = &mut task1 => {},
}
}
}
注意消息是如何立即处理的,而如果您在第 22 行注释了
biased
,则发送任务可能会在处理多条消息之前尝试将其推送到队列。