我有一个线程接收消息并循环处理它们。 我想从外部停止该线程。最好的方法是什么?
我没有发现任何东西可以从外部中断线程。所以我的想法是使用一个共享的 AtomicBool ,它可以从外部设置并在线程内部进行检查。但是线程在等待消息时无法检查 AtomicBool,因此我在设置 AtomicBool 后也发送了一条空消息。
这是代码来说明
use std::{
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, Sender},
Arc,
},
thread::{self, JoinHandle},
};
type Message = Option<u32>;
pub struct MessageHandler {
cancel_token: Arc<AtomicBool>,
pub sender: Sender<Message>,
join_handle: Option<JoinHandle<()>>,
}
impl MessageHandler {
pub fn new() -> MessageHandler {
let cancel_token = Arc::new(AtomicBool::new(false));
let (sender, receiver) = std::sync::mpsc::channel();
let join_handle = {
let cancel_token = cancel_token.clone();
thread::spawn(move || {
MessageHandler::handle_messages(receiver, cancel_token);
})
};
MessageHandler {
cancel_token,
sender,
join_handle: Some(join_handle),
}
}
pub fn send_message(&self, number: u32) {
self.sender.send(Some(number)).unwrap();
}
pub fn stop(&mut self) {
self.cancel_token.store(true, Ordering::Relaxed);
self.sender.send(None).unwrap();
self.join_handle.take().unwrap().join();
}
fn handle_messages(receiver: Receiver<Message>, cancel_token: Arc<AtomicBool>) {
for message in receiver {
if cancel_token.load(Ordering::Relaxed) {
return;
}
if let Some(_) = message {
// do something
}
}
}
}
我对此并不满意。它可能几乎总是有效。但我真的可以确定从线程的角度来看 AtomicBool 在收到消息之前就已设置吗?据我所知,我不能(AtomicBool 操作中的排序有什么作用?)。当然,我可以在收到 None 时结束线程,那么如果 AtomicBool 设置为较晚也没关系。 有更好的解决方案吗?
Receiver::recv
被删除时,Err
将返回
Sender
:
use std::{
sync::{
mpsc::{Receiver, Sender},
},
thread::{self, JoinHandle},
};
type Message = u32;
pub struct MessageHandler {
sender: Option<Sender<Message>>,
join_handle: Option<JoinHandle<()>>,
}
impl MessageHandler {
pub fn new() -> MessageHandler {
let (sender, receiver) = std::sync::mpsc::channel();
let join_handle = thread::spawn(move || {
MessageHandler::handle_messages(receiver);
});
MessageHandler {
sender: Some(sender),
join_handle: Some(join_handle),
}
}
pub fn send_message(&self, number: u32) {
if let Some(sender) = self.sender.as_ref() {
sender.send(number).unwrap();
} else {
panic!("send after stopped");
}
}
pub fn stop(&mut self) {
drop(std::mem::take(&mut self.sender));
self.join_handle.take().unwrap().join().unwrap();
}
fn handle_messages(receiver: Receiver<Message>) {
while let Ok(_message) = receiver.recv() {
// do something
}
}
}
关于您关于
AtomicBool
的问题,您可以在 std::sync::atomic::Ordering
的文档中阅读有关内存顺序的信息以及 Rust 所基于的链接 C++ 内存顺序。