有没有办法避免此实现中的“静态生命周期要求”?

问题描述 投票:0回答:1

我正在尝试更好地理解 Rust 中的并发性。我得到了以下任务:

A

Looper
是反应式中经常使用的范例。创建时,
Looper
会创建一个通用
Message
对象队列和一个线程。

线程等待(不消耗 CPU 周期)消息出现在队列中 按照到达的顺序一一提取,并进行处理。 Looper 构造函数接收两个参数,都是函数类型(指向):

process(…)
cleanup()
。第一个是负责处理通过队列接收到的各个消息的函数;该函数接受
Message
类型的单个输入参数并且不返回任何内容;第二个私有函数,有参数和返回值,当它代表终端时,将被封装在Looper中的线程调用。

Looper
除了服务方法之外,还提供了管理其第二个生命周期所需的独特的公共、线程安全方法:
send(msg)
,它接受类型的通用对象作为参数 Message,该方法将被插入到队列,随后从线程中提取并转发到处理函数。当 Looper 对象被销毁时,必须确保其中包含的线程调用构造函数中传递的第二个函数,然后终止。

使用 Rust 语言实现它。作为指导,请记住其方法,它们必须是线程安全的。

我想了解

'static
T
生命周期是否必要,或者可以避免采取不同的行为; 而且我不确定在运行时它是否会正常工作(特别是在删除
Looper
时)。

use crossbeam_channel::{bounded, Receiver, Sender};
use std::collections::VecDeque;
use std::rc::{Rc, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;

struct Message<T> {
    msg: T,
}

struct Queue<T> {
    message_queue: Mutex<VecDeque<Message<T>>>,
    cv: Condvar,
}

struct Looper<T> {
    queue: Arc<Queue<T>>,
    handle: JoinHandle<()>,
    terminate: Arc<AtomicBool>,
}

impl<T> Drop for Looper<T> {
    fn drop(&mut self) {
        self.terminate.store(true, Ordering::Relaxed);
        self.queue.cv.notify_all();
    }
}

// Implement methods for Looper
impl<T: Send + 'static> Looper<T> {
    fn new(process: fn(Message<T>), cleanup: fn()) -> Looper<T> {
        let queue = Arc::new(Queue {
            message_queue: Mutex::new(VecDeque::new()),
            cv: Condvar::new(),
        });
        let queue_clone = Arc::clone(&queue);
        let end_thread = Arc::new(AtomicBool::new(false));
        let end_thread_clone = Arc::clone(&end_thread);
        Looper {
            queue,
            handle: thread::spawn(move || {
                loop {
                    let mut lock = queue_clone.message_queue.lock().unwrap();

                    while lock.is_empty() && !end_thread_clone.load(Ordering::Relaxed) {
                        lock = queue_clone.cv.wait(lock).unwrap();
                    }

                    if end_thread_clone.load(Ordering::Relaxed) {
                        break;
                    }

                    if let Some(message) = lock.pop_back() {
                        drop(lock);
                        process(message);
                    }
                }
                cleanup();
            }),
            terminate: end_thread,
        }
    }
    fn send(&self, msg: Message<T>) {
        let mut queue = self.queue.message_queue.lock().unwrap();
        queue.push_back(msg);
        self.queue.cv.notify_all();
    }
}
multithreading rust concurrency thread-safety atomic
1个回答
0
投票

由于您使用的线程,

'static
生命周期是必要的。
'static
绑定在类型上意味着该类型可以永远存在;这往往适用于像
String
这样的自有数据类型,但不适用于像
&str
这样的借用类型(根据定义,它们通常有一些较短的生命周期要求,因为它们的寿命不得超过它们引用的数据)。

因为您要将这些消息发送到另一个线程(使用

thread::spawn
创建),并且该线程对 it 可以运行的数量没有限制,所以您发送给它的数据必须是
'static
,因为该线程理论上可以比你能想到的任何其他寿命都长。

请注意,出现此要求是因为您使用的是

thread::spawn
,它创建了无限生命周期的线程。另一种方法是使用
thread::scope
,它允许创建保证不会超过某个生命周期的线程。您需要将
Scope
对象传递到
Looper
构造函数才能正常工作。

© www.soinside.com 2019 - 2024. All rights reserved.