学习Rust tokio多线程时发现一个现象,不知道为什么。
这是代码。
use std::{thread, time::Duration};
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
thread::sleep(Duration::from_secs(i));
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
for i in 0..10 {
tokio::spawn(async_task(i));
}
println!("{}.main thread", now());
}
我运行代码,发现每10个异步任务都被执行了。结果如下
2023-09-05 22:08:05.async task 0!
2023-09-05 22:08:05.main thread
2023-09-05 22:08:06.async task 1!
2023-09-05 22:08:07.async task 2!
2023-09-05 22:08:08.async task 3!
2023-09-05 22:08:09.async task 4!
2023-09-05 22:08:10.async task 5!
2023-09-05 22:08:11.async task 6!
2023-09-05 22:08:12.async task 7!
2023-09-05 22:08:13.async task 8!
2023-09-05 22:08:14.async task 9!
当我注释掉
println
线程中的main
代码时,只有少数任务会被执行。
#[tokio::main]
async fn main() {
for i in 0..10 {
tokio::spawn(async_task(i));
}
// println!("{}.main thread", now());
}
结果如下
2023-09-05 22:10:51.async task 0!
2023-09-05 22:10:52.async task 1!
我已经尝试过很多次了,每次都会出现这种差异。每次尝试时,未注释掉的代码
println
都会执行所有异步任务,而另一个则不会。
我真的不明白为什么一个
println
可以产生如此大的影响。如果有人能提供帮助,我将不胜感激。
您的代码行为怪异的真正原因是您使用
std::thread::sleep
而不是 tokio::time::sleep
。
使用异步函数时,“永远不要”阻止这一点很重要。异步反应器是“非抢占式”的,这意味着它们只能在“.await
”点的任务之间进行调度。这意味着,如果您std::thread::sleep
,您就会阻止整个程序。这也是为什么在您的工作程序中,输出并不总是以正确的顺序打印,尽管每个输出之间的时间应该是整整一秒。
std::thread::sleep
替换为 tokio::time::sleep
,您将获得一致的行为(尽管这可能不是您想要的行为):
use std::time::Duration;
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
tokio::time::sleep(Duration::from_secs(i)).await;
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
for i in 0..10 {
tokio::spawn(async_task(i));
}
println!("{}.main thread", now());
tokio::time::sleep(Duration::from_millis(2500)).await;
}
2023-09-05 16:39:25.main thread
2023-09-05 16:39:25.async task 0!
2023-09-05 16:39:26.async task 1!
2023-09-05 16:39:27.async task 2!
为什么会这样?因为当
main
完成时,tokio 会取消所有剩余任务。同样,tokio 只能在
.await
点进行安排,因此在您的“工作”示例中,tokio 仍然尝试取消任务,但没有成功,因为它们从未达到
.await
点。解决此问题的最简单方法是不让 main
函数结束。 (其实这是我想到的唯一方法)
有多种方法可以实现这一目标。然而,就您而言,在我看来,您的意图是等待所有子任务完成,然后结束程序。
这可以通过等待连接句柄来完成:use std::time::Duration;
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
tokio::time::sleep(Duration::from_secs(i)).await;
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
let mut joinhandles = Vec::new();
for i in 0..10 {
joinhandles.push(tokio::spawn(async_task(i)));
}
println!("{}.main thread", now());
for joinhandle in joinhandles {
joinhandle.await.unwrap();
}
}
2023-09-05 16:47:23.main thread
2023-09-05 16:47:23.async task 0!
2023-09-05 16:47:24.async task 1!
2023-09-05 16:47:25.async task 2!
2023-09-05 16:47:26.async task 3!
2023-09-05 16:47:27.async task 4!
2023-09-05 16:47:28.async task 5!
2023-09-05 16:47:29.async task 6!
2023-09-05 16:47:30.async task 7!
2023-09-05 16:47:31.async task 8!
2023-09-05 16:47:32.async task 9!
一一等待它们有点乏味,而且也可以防止立即传播错误。相反,您可以使用
futures::future::try_join_all
use std::time::Duration;
use chrono::Local;
fn now() -> String {
Local::now().format("%F %T").to_string()
}
async fn async_task(i: u64) {
tokio::time::sleep(Duration::from_secs(i)).await;
println!("{}.async task {}!", now(), i);
}
#[tokio::main]
async fn main() {
// Same as the for-loop in the earlier example, just more compact and functional
let joinhandles = (0..10)
.map(|i| tokio::spawn(async_task(i)))
.collect::<Vec<_>>();
println!("{}.main thread", now());
futures::future::try_join_all(joinhandles).await.unwrap();
}
2023-09-05 16:56:14.main thread
2023-09-05 16:56:14.async task 0!
2023-09-05 16:56:15.async task 1!
2023-09-05 16:56:16.async task 2!
2023-09-05 16:56:17.async task 3!
2023-09-05 16:56:18.async task 4!
2023-09-05 16:56:19.async task 5!
2023-09-05 16:56:20.async task 6!
2023-09-05 16:56:21.async task 7!
2023-09-05 16:56:22.async task 8!
2023-09-05 16:56:23.async task 9!