如何在 Rust 中的多个任务之间共享和重置异步计时器

问题描述 投票:0回答:1

我正在开发一个可以在多个异步任务之间共享的异步计时器:

use std::{
    future::{self, Future},
    pin::Pin,
    sync::{Arc, Mutex},
    time::Duration,
};
use tokio::time::{self, Instant, Sleep};

struct Foo(Mutex<Pin<Box<Sleep>>>);

impl Foo {
    fn new(sleep: Sleep) -> Self {
        Self(Mutex::new(Box::pin(sleep)))
    }

    async fn sleep(&self) {
        future::poll_fn(|cx| self.0.lock().unwrap().as_mut().poll(cx)).await
    }

    fn reset(&self, deadline: Instant) {
        self.0.lock().unwrap().as_mut().reset(deadline);
    }
}

async fn task1(foo: Arc<Foo>) {
    println!("starting task 1 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 1 complete in {time} millis ");
}

async fn task2(foo: Arc<Foo>) {
    println!("starting task 2 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 2 complete in {time} millis ");
}

#[tokio::main]
pub async fn main() {
    let sleep = time::sleep(Duration::from_secs(3));
    let foo = Arc::new(Foo::new(sleep));

    let task1 = tokio::spawn(task1(foo.clone()));
    let task2 = tokio::spawn(task2(foo));

    tokio::join!(task1, task2);
}

输出:

starting task 2 ...
starting task 1 ...
task 1 complete in 3005 millis
// stuck here

问题是只有一项任务完成,而另一项任务陷入困境。发生这种情况是否是因为第二个轮询的未来唤醒器覆盖了第一个唤醒器?

我遇到了FutureExt::shared,但它需要未来的所有权。我需要能够在其他 future 等待时重置计时器。

asynchronous rust time async-await rust-tokio
1个回答
0
投票

如果您使用

tokio::sync::Mutex
而不是
std::sync::Mutex
会更有意义,因为您是在 tokio 任务而不是操作系统线程之间共享数据。

问题自行解决。

use std::{
    pin::Pin,
    sync::Arc,
    time::Duration,
};
use tokio::time::{self, Instant, Sleep};
use tokio::sync::Mutex;

struct Foo(Mutex<Pin<Box<Sleep>>>);

impl Foo {
    fn new(sleep: Sleep) -> Self {
        Self(Mutex::new(Box::pin(sleep)))
    }

    async fn sleep(&self) {
        self.0.lock().await.as_mut().await;
    }

    async fn reset(&self, deadline: Instant) {
        self.0.lock().await.as_mut().reset(deadline);
    }
}

async fn task1(foo: Arc<Foo>) {
    println!("starting task 1 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 1 complete in {time} millis ");
}

async fn task2(foo: Arc<Foo>) {
    println!("starting task 2 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 2 complete in {time} millis ");
}

#[tokio::main]
pub async fn main() {
    let sleep = time::sleep(Duration::from_secs(3));
    let foo = Arc::new(Foo::new(sleep));

    let task1 = tokio::spawn(task1(foo.clone()));
    let task2 = tokio::spawn(task2(foo));

    let _ = tokio::join!(task1, task2);
}
starting task 1 ...
starting task 2 ...
task 1 complete in 3009 millis
task 2 complete in 3009 millis
© www.soinside.com 2019 - 2024. All rights reserved.