我正在开发一个 Rust 项目,其中我正在实现一个可以动态接受具有不同返回类型的任务的线程池。以下是我迄今为止编写的代码:
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
type Task<R> = Box<dyn FnOnce() -> R + Send + 'static>;
struct ThreadPool<R> {
workers: Vec<Worker<R>>,
sender: mpsc::Sender<Task<R>>,
}
struct Worker<R> {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl<R> Worker<R> {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Task<R>>>>) -> Worker<R> {
let thread = thread::spawn(move || loop {
let task = receiver.lock().unwrap().recv().unwrap();
task();
});
Worker {
id,
thread: Some(thread),
}
}
}
impl<R> ThreadPool<R> {
fn new(size: usize) -> ThreadPool<R> {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
fn execute<F>(&self, f: F)
where
F: FnOnce() -> R + Send + 'static,
{
let task = Box::new(f);
self.sender.send(task).unwrap();
}
}
问题出现在Worker结构体的实现中:
impl<R> Worker<R> {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Task<R>>>>) -> Worker<R> {
let thread = thread::spawn(move || loop {
let task = receiver.lock().unwrap().recv().unwrap();
task();
});
Worker {
id,
thread: Some(thread),
}
}
}
在这里,我传递一个接收器来初始化 Worker,这需要通用参数 R。但是,这会导致问题,因为我希望我的线程池动态处理具有不同返回类型的任务。
如何修改此代码,以便我的线程池可以接受并执行具有不同返回类型的任务,而不会遇到通用参数的问题?我应该考虑使用特征对象或其他模式来实现这种灵活性吗?
我将分享我的线程池实现,请注意,这不是一个完全令人满意的答案。
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
type JobReturn = Box<dyn Send>;
type Job = Box<dyn FnOnce() -> JobReturn + Send + 'static>;
struct Assignment(Job, mpsc::Sender<JobReturn>);
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Assignment>>>) -> Worker {
Worker {
id,
thread: thread::spawn(move || loop {
let assignment = receiver.lock().unwrap().recv().unwrap();
let ret = assignment.0();
let _result = assignment.1.send(ret);
}),
}
}
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Assignment>,
}
impl ThreadPool {
pub fn new(pool_size: usize) -> ThreadPool {
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(pool_size);
for id in 0..pool_size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F) -> mpsc::Receiver<JobReturn>
where
F: FnOnce() -> JobReturn + Send + 'static,
{
let (sender, receiver) = mpsc::channel();
let job = Box::new(f);
self.sender.send(Assignment(job, sender)).unwrap();
receiver
}
}
#[cfg(test)]
mod tests {
use std::{
thread,
time::{Duration, Instant},
};
use super::ThreadPool;
#[test]
fn can_execute_job_and_receive_result() {
let pool = ThreadPool::new(1);
let receiver = pool.execute(|| Box::new(1));
let result = receiver.recv();
assert!(result.is_ok());
}
}
我是 Rust 新手,如果错误请纠正我:
我当前的实现有一个限制,即您从作业返回的结果没有类型信息(它是
type JobReturn = Box<dyn Send>
)。
我尝试使用像
这样的泛型// impl ThreadPool
pub fn execute<F, T>(&self, f: F) -> mpsc::Receiver<T> {...}
但是如果我这样做,我正在创建一个只能返回这种类型的ThreadPool,它不是动态的。您不能使 ThreadPool 的此实例返回任何其他类型。
所以我认为最好将结果的检索处理到作业本身,ThreadPool 不应该这样做。T