在 Rust 中我应该如何从外部停止线程?

问题描述 投票:0回答:1

我有一个线程接收消息并循环处理它们。 我想从外部停止该线程。最好的方法是什么?

我没有发现任何东西可以从外部中断线程。所以我的想法是使用一个共享的 AtomicBool ,它可以从外部设置并在线程内部进行检查。但是线程在等待消息时无法检查 AtomicBool,因此我在设置 AtomicBool 后也发送了一条空消息。

这是代码来说明

use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        mpsc::{Receiver, Sender},
        Arc,
    },
    thread::{self, JoinHandle},
};

type Message = Option<u32>;

pub struct MessageHandler {
    cancel_token: Arc<AtomicBool>,
    pub sender: Sender<Message>,
    join_handle: Option<JoinHandle<()>>,
}

impl MessageHandler {
    pub fn new() -> MessageHandler {
        let cancel_token = Arc::new(AtomicBool::new(false));
        let (sender, receiver) = std::sync::mpsc::channel();

        let join_handle = {
            let cancel_token = cancel_token.clone();
            thread::spawn(move || {
                MessageHandler::handle_messages(receiver, cancel_token);
            })
        };

        MessageHandler {
            cancel_token,
            sender,
            join_handle: Some(join_handle),
        }
    }

    pub fn send_message(&self, number: u32) {
        self.sender.send(Some(number)).unwrap();
    }

    pub fn stop(&mut self) {
        self.cancel_token.store(true, Ordering::Relaxed);
        self.sender.send(None).unwrap();
        self.join_handle.take().unwrap().join();
    }

    fn handle_messages(receiver: Receiver<Message>, cancel_token: Arc<AtomicBool>) {
        for message in receiver {
            if cancel_token.load(Ordering::Relaxed) {
                return;
            }

            if let Some(_) = message {
                // do something
            }
        }
    }
}

我对此并不满意。它可能几乎总是有效。但我真的可以确定从线程的角度来看 AtomicBool 在收到消息之前就已设置吗?据我所知,我不能(AtomicBool 操作中的排序有什么作用?)。当然,我可以在收到 None 时结束线程,那么如果 AtomicBool 设置为较晚也没关系。 有更好的解决方案吗?

multithreading rust
1个回答
0
投票

问题有更好的解决方案!您可以利用以下事实:当相应的

Receiver::recv
被删除时,
Err
 将返回 
Sender

use std::{
    sync::{
        mpsc::{Receiver, Sender},
    },
    thread::{self, JoinHandle},
};

type Message = u32;

pub struct MessageHandler {
    sender: Option<Sender<Message>>,
    join_handle: Option<JoinHandle<()>>,
}

impl MessageHandler {
    pub fn new() -> MessageHandler {
        let (sender, receiver) = std::sync::mpsc::channel();

        let join_handle = thread::spawn(move || {
            MessageHandler::handle_messages(receiver);
        });

        MessageHandler {
            sender: Some(sender),
            join_handle: Some(join_handle),
        }
    }

    pub fn send_message(&self, number: u32) {
        if let Some(sender) = self.sender.as_ref() {
            sender.send(number).unwrap();
        } else {
            panic!("send after stopped");
        }
    }

    pub fn stop(&mut self) {
        drop(std::mem::take(&mut self.sender));
        self.join_handle.take().unwrap().join().unwrap();
    }

    fn handle_messages(receiver: Receiver<Message>) {
        while let Ok(_message) = receiver.recv() {
            // do something
        }
    }
}

关于您关于

AtomicBool
的问题,您可以在
std::sync::atomic::Ordering
的文档中阅读有关内存顺序的信息以及 Rust 所基于的链接 C++ 内存顺序

© www.soinside.com 2019 - 2024. All rights reserved.