我正在尝试更好地理解 Rust 中的并发性。我得到了以下任务:
A
是反应式中经常使用的范例。创建时,Looper
会创建一个通用Looper
对象队列和一个线程。Message
线程等待(不消耗 CPU 周期)消息出现在队列中 按照到达的顺序一一提取,并进行处理。 Looper 构造函数接收两个参数,都是函数类型(指向):
和process(…)
。第一个是负责处理通过队列接收到的各个消息的函数;该函数接受cleanup()
类型的单个输入参数并且不返回任何内容;第二个私有函数,有参数和返回值,当它代表终端时,将被封装在Looper中的线程调用。Message
除了服务方法之外,还提供了管理其第二个生命周期所需的独特的公共、线程安全方法:Looper
,它接受类型的通用对象作为参数 Message,该方法将被插入到队列,随后从线程中提取并转发到处理函数。当 Looper 对象被销毁时,必须确保其中包含的线程调用构造函数中传递的第二个函数,然后终止。send(msg)
使用 Rust 语言实现它。作为指导,请记住其方法,它们必须是线程安全的。
我想了解
'static
的 T
生命周期是否必要,或者可以避免采取不同的行为;
而且我不确定在运行时它是否会正常工作(特别是在删除 Looper
时)。
use crossbeam_channel::{bounded, Receiver, Sender};
use std::collections::VecDeque;
use std::rc::{Rc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
struct Message<T> {
msg: T,
}
struct Queue<T> {
message_queue: Mutex<VecDeque<Message<T>>>,
cv: Condvar,
}
struct Looper<T> {
queue: Arc<Queue<T>>,
handle: JoinHandle<()>,
terminate: Arc<AtomicBool>,
}
impl<T> Drop for Looper<T> {
fn drop(&mut self) {
self.terminate.store(true, Ordering::Relaxed);
self.queue.cv.notify_all();
}
}
// Implement methods for Looper
impl<T: Send + 'static> Looper<T> {
fn new(process: fn(Message<T>), cleanup: fn()) -> Looper<T> {
let queue = Arc::new(Queue {
message_queue: Mutex::new(VecDeque::new()),
cv: Condvar::new(),
});
let queue_clone = Arc::clone(&queue);
let end_thread = Arc::new(AtomicBool::new(false));
let end_thread_clone = Arc::clone(&end_thread);
Looper {
queue,
handle: thread::spawn(move || {
loop {
let mut lock = queue_clone.message_queue.lock().unwrap();
while lock.is_empty() && !end_thread_clone.load(Ordering::Relaxed) {
lock = queue_clone.cv.wait(lock).unwrap();
}
if end_thread_clone.load(Ordering::Relaxed) {
break;
}
if let Some(message) = lock.pop_back() {
drop(lock);
process(message);
}
}
cleanup();
}),
terminate: end_thread,
}
}
fn send(&self, msg: Message<T>) {
let mut queue = self.queue.message_queue.lock().unwrap();
queue.push_back(msg);
self.queue.cv.notify_all();
}
}
由于您使用的线程,
'static
生命周期是必要的。 'static
绑定在类型上意味着该类型可以永远存在;这往往适用于像 String
这样的自有数据类型,但不适用于像 &str
这样的借用类型(根据定义,它们通常有一些较短的生命周期要求,因为它们的寿命不得超过它们引用的数据)。
因为您要将这些消息发送到另一个线程(使用
thread::spawn
创建),并且该线程对 it 可以运行的数量没有限制,所以您发送给它的数据必须是 'static
,因为该线程理论上可以比你能想到的任何其他寿命都长。
请注意,出现此要求是因为您使用的是
thread::spawn
,它创建了无限生命周期的线程。另一种方法是使用 thread::scope
,它允许创建保证不会超过某个生命周期的线程。您需要将 Scope
对象传递到 Looper
构造函数才能正常工作。