如何在 Rust 中的多个任务之间共享和重置异步计时器 (tokio::time::Sleep)

问题描述 投票:0回答:2

我正在开发一个可以在多个异步任务之间共享的异步计时器:

use std::{
    future::{self, Future},
    pin::Pin,
    sync::{Arc, Mutex},
    time::Duration,
};
use tokio::time::{self, Instant, Sleep};

struct Foo(Mutex<Pin<Box<Sleep>>>);

impl Foo {
    fn new(sleep: Sleep) -> Self {
        Self(Mutex::new(Box::pin(sleep)))
    }

    async fn sleep(&self) {
        future::poll_fn(|cx| self.0.lock().unwrap().as_mut().poll(cx)).await
    }

    fn reset(&self, deadline: Instant) {
        self.0.lock().unwrap().as_mut().reset(deadline);
    }
}

async fn task1(foo: Arc<Foo>) {
    println!("starting task 1 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 1 complete in {time} millis ");
}

async fn task2(foo: Arc<Foo>) {
    println!("starting task 2 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 2 complete in {time} millis ");
}

#[tokio::main]
pub async fn main() {
    let sleep = time::sleep(Duration::from_secs(3));
    let foo = Arc::new(Foo::new(sleep));

    let task1 = tokio::spawn(task1(foo.clone()));
    let task2 = tokio::spawn(task2(foo));

    tokio::join!(task1, task2);
}

输出:

starting task 2 ...
starting task 1 ...
task 1 complete in 3005 millis
// stuck here

问题是只有一项任务完成,而另一项任务陷入困境。发生这种情况是否是因为第二个轮询的未来唤醒器覆盖了第一个唤醒器?

我遇到了FutureExt::shared,但它需要未来的所有权。我需要能够在其他 future 等待时重置计时器。

asynchronous rust time async-await rust-tokio
2个回答
1
投票

如果您使用

tokio::sync::Mutex
而不是
std::sync::Mutex
会更有意义,因为您是在 tokio 任务而不是操作系统线程之间共享数据。

问题自行解决。

use std::{
    pin::Pin,
    sync::Arc,
    time::Duration,
};
use tokio::time::{self, Instant, Sleep};
use tokio::sync::Mutex;

struct Foo(Mutex<Pin<Box<Sleep>>>);

impl Foo {
    fn new(sleep: Sleep) -> Self {
        Self(Mutex::new(Box::pin(sleep)))
    }

    async fn sleep(&self) {
        self.0.lock().await.as_mut().await;
    }

    async fn reset(&self, deadline: Instant) {
        self.0.lock().await.as_mut().reset(deadline);
    }
}

async fn task1(foo: Arc<Foo>) {
    println!("starting task 1 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 1 complete in {time} millis ");
}

async fn task2(foo: Arc<Foo>) {
    println!("starting task 2 ...");
    let start = Instant::now();

    foo.sleep().await;

    let time = start.elapsed().as_millis();
    println!("task 2 complete in {time} millis ");
}

#[tokio::main]
pub async fn main() {
    let sleep = time::sleep(Duration::from_secs(3));
    let foo = Arc::new(Foo::new(sleep));

    let task1 = tokio::spawn(task1(foo.clone()));
    let task2 = tokio::spawn(task2(foo));

    let _ = tokio::join!(task1, task2);
}
starting task 1 ...
starting task 2 ...
task 1 complete in 3009 millis
task 2 complete in 3009 millis

1
投票

经过一番挖掘,我发现

Sleep
的实例只能容纳单个异步任务唤醒器:

https://github.com/tokio-rs/tokio/blob/21df16d7595880247642c4fb38f1c365a49de75b/tokio/src/runtime/time/entry.rs#L102

poll
上的每次调用
Sleep
都会覆盖之前轮询注册的旧唤醒器,因此需要手动将就绪状态传播到所有唤醒器:

struct Foo(Mutex<FooInner>);

struct FooInner {
    sleep: Pin<Box<Sleep>>,
    wakers: Vec<Waker>,
}

impl Foo {
    fn new(sleep: Sleep) -> Self {
        Self(Mutex::new(FooInner {
            sleep: Box::pin(sleep),
            wakers: Vec::new(),
        }))
    }

    async fn sleep(&self) {
        future::poll_fn(|cx| {
            let mut inner = self.0.lock().unwrap();

            match inner.sleep.as_mut().poll(cx) {
                Poll::Ready(()) => {
                    // propagate the ready state to all wakers
                    for waker in inner.wakers.drain(..) {
                        waker.wake();
                    }

                    Poll::Ready(())
                }
                Poll::Pending => {
                    inner.wakers.push(cx.waker().clone());
                    Poll::Pending
                }
            }
        })
        .await
    }

    fn reset(&self, deadline: Instant) {
        let mut inner = self.0.lock().unwrap();

        inner.sleep.as_mut().reset(deadline);

        // deadline might have been reset to an earlier time
        // wake up all wakers to re-evaluate the new deadline
        for waker in inner.wakers.drain(..) {
            waker.wake();
        }
    }
}

正如@啊鹿Dizzyi提到的,切换到

tokio::sync::Mutex
可以解决问题。这是因为锁保证只有一个任务可以注册其唤醒程序,因为在解析
Sleep
之前其他任务无法访问内部
Sleep
。换句话说,
tokio::sync::Mutex
有助于传播就绪状态。

但是,为了实现 FIFO 基础,

tokio::sync::Mutex
是使用信号量实现的,信号量基本上是
std::sync::Mutex<LinkedList<Waker>>
。如果任务的完成顺序不需要是 FIFO,手动将唤醒器存储在
Vec
Slab
中可以提高性能。

© www.soinside.com 2019 - 2024. All rights reserved.