我是并发编码新手,并尝试自己实现简单的
ThreadPool
。我在学习网站(jenkov.com)上找到了这个实现,它似乎工作正常。
但是,我认为
thread
和 isStopped
应该至少是 volatile
类或 PoolThreadRunnable
变量中的 Atomic
,因为它们由两个线程共享,即在对象上运行该方法的线程(其中 isStopped=true
和 this.thread.interrupt()
被调用)以及正在运行实际运行代码的代码(我们在其中执行 this.thread=Thread.currentThread()
和 while(!isStopped())
。
我的理解正确还是我遗漏了什么?
public class ThreadPool {
private BlockingQueue taskQueue = null;
private List<PoolThreadRunnable> runnables = new ArrayList<>();
private boolean isStopped = false;
public ThreadPool(int noOfThreads, int maxNoOfTasks){
taskQueue = new ArrayBlockingQueue(maxNoOfTasks);
for(int i=0; i<noOfThreads; i++){
PoolThreadRunnable poolThreadRunnable =
new PoolThreadRunnable(taskQueue);
runnables.add(poolThreadRunnable);
}
for(PoolThreadRunnable runnable : runnables){
new Thread(runnable).start();
}
}
public synchronized void execute(Runnable task) throws Exception{
if(this.isStopped) throw
new IllegalStateException("ThreadPool is stopped");
this.taskQueue.offer(task);
}
public synchronized void stop(){
this.isStopped = true;
for(PoolThreadRunnable runnable : runnables){
runnable.doStop();
}
}
public synchronized void waitUntilAllTasksFinished() {
while(this.taskQueue.size() > 0) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class PoolThreadRunnable implements Runnable {
private Thread thread = null;
private BlockingQueue taskQueue = null;
private boolean isStopped = false;
public PoolThreadRunnable(BlockingQueue queue){
taskQueue = queue;
}
public void run(){
this.thread = Thread.currentThread();
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
public synchronized void doStop(){
isStopped = true;
//break pool thread out of dequeue() call.
this.thread.interrupt();
}
public synchronized boolean isStopped(){
return isStopped;
}
}
这两个
isStopped
字段看起来都受到同步方法的适当保护。请注意,while
方法中的run()
循环调用isStopped()
方法,而不是直接读取字段。但 thread
字段的分配有问题。该字段不是易失性的,并且写入不受同步块保护以匹配同步 doStop()
方法。
根据类的其余部分的实现方式,以下内容将是一个很好的解决方案:
public void run(){
synchronized (this) {
if (isStopped) {
return;
}
this.thread = Thread.currentThread();
}
while(!isStopped()){
try{
Runnable runnable = (Runnable) taskQueue.take();
runnable.run();
} catch(Exception e){
//log or otherwise report exception,
//but keep pool thread alive.
}
}
}
为什么要在
this
上同步?因为同步的 instance 方法隐式地在 this
上进行同步,并且如果您想创建正确的 happens-before 关系,那么在同一个 object 上进行同步非常重要。另外,为了避免潜在的 NullPointerException
,您可能需要将 doStop()
更改为:
public synchronized void doStop(){
isStopped = true;
if (thread != null) {
//break pool thread out of dequeue() call.
thread.interrupt();
}
}
我还有另一个问题(尽管无关紧要),即执行 Callable 会发生哪些变化。如果您知道任何参考资料,请给我指点,我将不胜感激。
我不知道有什么入门教程。但
Callable
的目的是能够返回结果,而 Runnable
无法做到这一点。您需要线程池返回一些表示异步结果的对象。在 Java Executor Framework 中,这是一个 Future
。您可以查看 ThreadPoolExecutor
和
FutureTask
的实现,了解它们是如何实现的。
因此,使用 Java Executor Framework 获得灵感,首先您应该创建一个类似于
Future
的界面。这个界面可以更简单,因为您只是想学习,而不是实现功能齐全的 API。您可以根据需要添加功能。
import java.util.concurrent.ExecutionException;
public interface Result<V> {
/**
* Gets the value of this {@code Result}, blocking until the background task
* is done if necessary.
*
* @return the value of the background task
* @throws ExecutionException if the background task throws an exception; the
* thrown exception will be the cause of the {@code ExecutionException}
* @throws InterruptedException if the calling thread is interrupted
*/
V get() throws ExecutionException, InterruptedException;
/**
* Tests if the background task is complete.
*
* @return {@code true} if the background task is complete, {@code false}
* otherwise
*/
boolean isDone();
}
然后您应该创建一个实现 also 实现
Runnable
。此实现将接受 Callable
并负责调用它。
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ResultTask<V> implements Result<V>, Runnable {
private final AtomicBoolean executed = new AtomicBoolean();
private final Lock lock = new ReentrantLock();
private final Condition isDone = lock.newCondition();
private final Callable<V> callable;
// these three fields are guarded by 'lock'
private boolean done;
private V result;
private Throwable error;
public ResultTask(Callable<V> callable) {
this.callable = Objects.requireNonNull(callable);
}
@Override
public V get() throws ExecutionException, InterruptedException {
lock.lockInterruptibly();
try {
while (!done) {
// Called in a loop to handle a so-called "spurious wakeup" of the thread
isDone.await();
}
if (error != null) {
throw new ExecutionException(error);
}
return result;
} finally {
lock.unlock();
}
}
@Override
public boolean isDone() {
lock.lock();
try {
return done;
} finally {
lock.unlock();
}
}
@Override
public void run() {
if (!executed.compareAndSet(false, true)) {
throw new IllegalStateException("task already executed");
}
try {
// don't hold the lock while invoking the Callable
V result = callable.call();
complete(result, null);
} catch (Throwable error) {
complete(null, error);
}
}
private void complete(V result, Throwable error) {
lock.lock();
try {
this.result = result;
this.error = error;
done = true;
// Signal all threads blocked on 'isDone.await()' to wake them up
isDone.signalAll();
} finally {
lock.unlock();
}
}
}
然后你的
ThreadPool
将采取Callable
并将其包裹在aResultTask
中。由于 ResultTask
实现了 Runnable
,因此您可以将其放入任务队列中以像平常一样执行。然后,您将 ResultTask
作为 Result
返回给调用者,以便可以在将来的某个时刻查询任务的结果。
public <V> Result<V> execute(Callable<V> callable) throws Exception {
Objects.requireNonNull(callable);
synchronized (this) {
if (isStopped)
throw new IllegalStateException("ThreadPool is stopped");
ResultTask<V> task = new ResultTask<>(callable);
taskQueue.offer(task);
return task;
}
}
警告: 可能应该检查
offer
的结果。见答案末尾。
您甚至可以将
execute(Runnable)
方法更改为:
public Result<Void> execute(Runnable runnable) throws Exception {
Objects.requireNonNull(runnable);
return execute(() -> {
runnable.run();
return null;
});
}
这会让调用者知道
Runnable
何时完成,即使没有任何值。
顺便说一句,如果
BlockingQueue
是 java.util.concurrent.BlockingQueue
,请注意以下事项:
该界面是通用的。您尚未对其进行参数化,这意味着您正在使用原始类型。不要使用原始类型;使用
BlockingQueue<Runnable>
和 new ArrayBlockingQueue<>(...)
(注意 <>
)。这也将消除在 Runnable
方法中转换为 PoolThreadRunnable#run()
的需要。
offer(E)
方法将返回
false
。您不检查此项,这意味着 execute
的调用者可能会认为任务已成功排队,尽管事实并非如此。如果 offer
返回 false
,您可能应该抛出异常。在 Java Executor Framework中,它将是一个
RejectedExecutionException
。