我有一个 Rust 应用程序,它基本上是一个带有一些路由的 axum Web 服务器。
但是现在我需要定期检查数据库表中是否有一些新行。
如果有的话,我需要进行一些可能持续几分钟的繁重计算(特别是在尺寸较小的 Docker 容器中)。
我使用的代码如下,输出为:
The main thread should not be blocked and should print every second: Instant { t: 6479.5889821s }
The main thread should not be blocked and should print every second: Instant { t: 6480.5996495s }
The main thread should not be blocked and should print every second: Instant { t: 6481.6152853s }
Starting an heavy CPU computation...
The main thread should not be blocked and should print every second: Instant { t: 6502.5748215s }
The main thread should not be blocked and should print every second: Instant { t: 6503.5917731s }
The main thread should not be blocked and should print every second: Instant { t: 6504.5990575s }
如您所见,
Starting an heavy CPU computation...
阻塞了主线程。
有办法避免这种情况吗?
我应该对每项繁重的工作使用
tokio::task::spawn_blocking()
吗?
我可以在单独的线程中启动整个“worker”吗?因为我有很多不同的工作。
我的意思是
main()
中的这段代码:
let worker = Queue::new();
tokio::spawn(async move { worker.run().await }); // Is there a way to launch this in a separate thread?
代码在这里:Rust Playground
或者在这里:Rust Explorer
或者在这里:
use futures::StreamExt;
use rand::Rng;
use std::time::{Duration, Instant};
const CONCURRENCY: usize = 5;
struct Job {
id: u16,
}
struct Queue {
// some needed fields like DB connection
}
impl Queue {
fn new() -> Self {
Self {}
}
async fn run(&self) {
loop {
// I'll get jobs from DB here; for this demo are random generated
let mut jobs: Vec<Job> = Vec::new();
for _ in 0..2 {
jobs.push(Job {
id: get_random_id(),
})
}
futures::stream::iter(jobs)
.for_each_concurrent(CONCURRENCY, |job| async {
match self.handle_job(job).await {
Ok(_) => {
// I will remove the job from queue
}
Err(_) => {
// I will handle this error
}
};
})
.await;
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
async fn handle_job(&self, job: Job) -> Result<(), String> {
if job.id % 2 == 0 {
println!("Starting an heavy CPU computation...");
// I'm simulating heavy CPU computations with this sleep thread blocking here
std::thread::sleep(Duration::from_secs(10));
// I think I can use spawn_blocking instead, right?
// tokio::task::spawn_blocking(move || {
// std::thread::sleep(Duration::from_secs(8));
// }).await.unwrap()
}
Ok(())
}
}
#[tokio::main]
async fn main() {
let worker = Queue::new();
// Can I start the below worker.run() in a separate thread?
tokio::spawn(async move { worker.run().await });
loop {
println!(
"The main thread should not be blocked and should print every second: {:?}",
Instant::now()
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
fn get_random_id() -> u16 {
let mut rng = rand::thread_rng();
rng.gen::<u16>()
}
是的,您应该使用
spawn_blocking
。这将确保 main 中的异步循环不会被 CPU 密集型代码阻塞。
Tokio 有许多专门用于此目的的阻塞线程,虽然如果您无限制地运行 CPU 繁重的任务可能会出现问题,但您正在使用
for_each_concurrent
对并行性施加自己的限制,因此这不是问题。