Rust 在不使用 Arc/Mutex 的情况下从多个线程访问对象

问题描述 投票:0回答:1

我正在尝试用 Rust 编写一个下载器。我面临的问题是我正在尝试实现一个暂停功能,以便下载方法可以继续进行,并且可以调用另一个暂停方法来中断下载。不幸的是,使用

Arc
Mutex
不起作用,因为下载将锁定对象,直到下载进度完成。

这是一个最小的例子:

struct Downloader {
    status : DownloadStatus,
    // some object storing download of file as bytes
}

impl Downloader{

    pub fn pause (self : &mut Downloader) -> () {
        self.status = DownloadStatus::Paused;
    }
    pub async fn download (self : &mut Downloader) -> () {
        self.status = DownloadStatus::Downloading;
        // mimic downloading of a file
        loop {
            match self.status {
                DownloadStatus::Paused => {break;},
                _ => {}
            }
            // downloading ..
        }
    }
}


#[tokio::main]
async fn main() {
    let downloader = Downloader {
        status: DownloadStatus::Idle
    };

    let download_task = tokio::spawn({
        async {
            // downloader.download().await;
        }
    });

    let pause_task = tokio::spawn({
        async {
            // tokio::time::sleep(Duration::from_secs(2)).await; // Simulating some delay
            // downloader.pause();
        }
    });

    // Await both tasks to complete
    let _ = tokio::try_join!(download_task, pause_task);
}

如何解决此类问题?

asynchronous rust concurrency
1个回答
0
投票

我会将结构分成两部分。 内部包含下载任务所需的所有成员,以及一个原子布尔值,以便从另一个线程影响此任务。

所有这一切都依赖于内部可变性。 原子变量本质上是在多个并行任务之间共享的,但允许突变。 对内部部分的访问受到互斥锁的保护,但由于原子布尔值位于该内部部分之外,因此两者可以同时使用。

请在下面找到相应调整的示例。

use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, Mutex,
    },
    time::Duration,
};

struct InnerDownloader {
    amount: usize,
    // other members
    // ...
}
struct Downloader {
    downloading: AtomicBool,
    inner: Mutex<InnerDownloader>,
}

impl Downloader {
    pub fn new() -> Self {
        Self {
            downloading: AtomicBool::new(false),
            inner: Mutex::new(InnerDownloader { amount: 0 }),
        }
    }
    pub fn pause(&self) {
        self.downloading.store(false, Ordering::Relaxed);
    }
    pub async fn download(&self) {
        self.downloading.store(true, Ordering::Relaxed);
        // mimic downloading of a file
        loop {
            if !self.downloading.load(Ordering::Relaxed) {
                break;
            }
            // simulating download
            tokio::time::sleep(Duration::from_millis(250)).await;
            if let Ok(mut inner) = self.inner.lock() {
                inner.amount += 1;
            }
        }
        println!("leaving download()");
    }
    pub fn amount(&self) -> Option<usize> {
        self.inner.lock().ok().map(|inner| inner.amount)
    }
}

#[tokio::main]
async fn main() {
    let downloader = Arc::new(Downloader::new());

    let download_task = tokio::spawn({
        let downloader = Arc::clone(&downloader);
        async move {
            downloader.download().await;
        }
    });

    let pause_task = tokio::spawn({
        let downloader = Arc::clone(&downloader);
        async move {
            // Simulating some delay
            tokio::time::sleep(Duration::from_secs(2)).await;
            downloader.pause();
        }
    });

    // Await both tasks to complete
    let _ = tokio::try_join!(download_task, pause_task);
    println!("amount: {:?}", downloader.amount());
}
/*
leaving download()
amount: Some(8)
*/
© www.soinside.com 2019 - 2024. All rights reserved.