这个ThreadPool实现正确吗?

问题描述 投票:0回答:1

我是并发编码新手,并尝试自己实现简单的

ThreadPool
。我在学习网站(jenkov.com)上找到了这个实现,它似乎工作正常。

但是,我认为

thread
isStopped
应该至少是
volatile
类或
PoolThreadRunnable
变量中的
Atomic
,因为它们由两个线程共享,即在对象上运行该方法的线程(其中
isStopped=true
this.thread.interrupt()
被调用)以及正在运行实际运行代码的代码(我们在其中执行
this.thread=Thread.currentThread()
while(!isStopped())

我的理解正确还是我遗漏了什么?

public class ThreadPool {

    private BlockingQueue taskQueue = null;
    private List<PoolThreadRunnable> runnables = new ArrayList<>();
    private boolean isStopped = false;

    public ThreadPool(int noOfThreads, int maxNoOfTasks){
        taskQueue = new ArrayBlockingQueue(maxNoOfTasks);

        for(int i=0; i<noOfThreads; i++){
            PoolThreadRunnable poolThreadRunnable =
                    new PoolThreadRunnable(taskQueue);

            runnables.add(poolThreadRunnable);
        }
        for(PoolThreadRunnable runnable : runnables){
            new Thread(runnable).start();
        }
    }

    public synchronized void  execute(Runnable task) throws Exception{
        if(this.isStopped) throw
                new IllegalStateException("ThreadPool is stopped");

        this.taskQueue.offer(task);
    }

    public synchronized void stop(){
        this.isStopped = true;
        for(PoolThreadRunnable runnable : runnables){
            runnable.doStop();
        }
    }

    public synchronized void waitUntilAllTasksFinished() {
        while(this.taskQueue.size() > 0) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class PoolThreadRunnable implements Runnable {

    private Thread        thread    = null;
    private BlockingQueue taskQueue = null;
    private boolean       isStopped = false;

    public PoolThreadRunnable(BlockingQueue queue){
        taskQueue = queue;
    }

    public void run(){
        this.thread = Thread.currentThread();
        while(!isStopped()){
            try{
                Runnable runnable = (Runnable) taskQueue.take();
                runnable.run();
            } catch(Exception e){
                //log or otherwise report exception,
                //but keep pool thread alive.
            }
        }
    }

    public synchronized void doStop(){
        isStopped = true;
        //break pool thread out of dequeue() call.
        this.thread.interrupt();
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}
java multithreading concurrency
1个回答
0
投票

这两个

isStopped
字段看起来都受到同步方法的适当保护。请注意,
while
方法中的
run()
循环调用
isStopped()
方法,而不是直接读取字段。但
thread
字段的分配有问题。该字段不是易失性的,并且写入不受同步块保护以匹配同步
doStop()
方法。

根据类的其余部分的实现方式,以下内容将是一个很好的解决方案:

public void run(){
    synchronized (this) {
        if (isStopped) {
            return;
        }
        this.thread = Thread.currentThread();
    }

    while(!isStopped()){
        try{
            Runnable runnable = (Runnable) taskQueue.take();
            runnable.run();
        } catch(Exception e){
            //log or otherwise report exception,
            //but keep pool thread alive.
        }
    }
}

为什么要在

this
上同步?因为同步的 instance 方法隐式地在
this
上进行同步,并且如果您想创建正确的 happens-before 关系,那么在同一个 object 上进行同步非常重要。另外,为了避免潜在的
NullPointerException
,您可能需要将
doStop()
更改为:

public synchronized void doStop(){
    isStopped = true;
    if (thread != null) {
        //break pool thread out of dequeue() call.
        thread.interrupt();
    }
}

我还有另一个问题(尽管无关紧要),即执行 Callable 会发生哪些变化。如果您知道任何参考资料,请给我指点,我将不胜感激。

我不知道有什么入门教程。但

Callable
的目的是能够返回结果,而
Runnable
无法做到这一点。您需要线程池返回一些表示异步结果的对象。在 Java Executor Framework 中,这是一个
Future
。您可以查看 ThreadPoolExecutor
FutureTask
实现
,了解它们是如何实现的。

因此,使用 Java Executor Framework 获得灵感,首先您应该创建一个类似于

Future
的界面。这个界面可以更简单,因为您只是想学习,而不是实现功能齐全的 API。您可以根据需要添加功能。

import java.util.concurrent.ExecutionException;

public interface Result<V> {

  /**
   * Gets the value of this {@code Result}, blocking until the background task
   * is done if necessary.
   *
   * @return the value of the background task
   * @throws ExecutionException if the background task throws an exception; the
   * thrown exception will be the cause of the {@code ExecutionException}
   * @throws InterruptedException if the calling thread is interrupted
   */
  V get() throws ExecutionException, InterruptedException;

  /**
   * Tests if the background task is complete.
   *
   * @return {@code true} if the background task is complete, {@code false}
   * otherwise
   */
  boolean isDone();
}

然后您应该创建一个实现 also 实现

Runnable
。此实现将接受
Callable
并负责调用它。

import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ResultTask<V> implements Result<V>, Runnable {

  private final AtomicBoolean executed = new AtomicBoolean();

  private final Lock lock = new ReentrantLock();
  private final Condition isDone = lock.newCondition();

  private final Callable<V> callable;
  // these three fields are guarded by 'lock'
  private boolean done;
  private V result;
  private Throwable error;

  public ResultTask(Callable<V> callable) {
    this.callable = Objects.requireNonNull(callable);
  }

  @Override
  public V get() throws ExecutionException, InterruptedException {
    lock.lockInterruptibly();
    try {
      while (!done) {
        // Called in a loop to handle a so-called "spurious wakeup" of the thread
        isDone.await();
      }

      if (error != null) {
        throw new ExecutionException(error);
      }
      return result;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public boolean isDone() {
    lock.lock();
    try {
      return done;
    } finally {
      lock.unlock();
    }
  }

  @Override
  public void run() {
    if (!executed.compareAndSet(false, true)) {
      throw new IllegalStateException("task already executed");
    }

    try {
      // don't hold the lock while invoking the Callable
      V result = callable.call();
      complete(result, null);
    } catch (Throwable error) {
      complete(null, error);
    }
  }

  private void complete(V result, Throwable error) {
    lock.lock();
    try {
      this.result = result;
      this.error = error;
      done = true;
      // Signal all threads blocked on 'isDone.await()' to wake them up
      isDone.signalAll();
    } finally {
      lock.unlock();
    }
  }
}

然后你的

ThreadPool
将采取
Callable
并将其包裹在a
ResultTask
中。由于
ResultTask
实现了
Runnable
,因此您可以将其放入任务队列中以像平常一样执行。然后,您将
ResultTask
作为
Result
返回给调用者,以便可以在将来的某个时刻查询任务的结果。

public <V> Result<V> execute(Callable<V> callable) throws Exception {
  Objects.requireNonNull(callable);
  synchronized (this) {
    if (isStopped)
      throw new IllegalStateException("ThreadPool is stopped");

    ResultTask<V> task = new ResultTask<>(callable);
    taskQueue.offer(task);
    return task;
  }
}

警告: 可能应该检查

offer
的结果。见答案末尾。

您甚至可以将

execute(Runnable)
方法更改为:

public Result<Void> execute(Runnable runnable) throws Exception {
  Objects.requireNonNull(runnable);
  return execute(() -> {
    runnable.run();
    return null;
  });
}

这会让调用者知道

Runnable
何时完成,即使没有任何值。


顺便说一句,如果

BlockingQueue
java.util.concurrent.BlockingQueue
,请注意以下事项:

  1. 该界面是通用的。您尚未对其进行参数化,这意味着您正在使用原始类型。不要使用原始类型;使用

    BlockingQueue<Runnable>
    new ArrayBlockingQueue<>(...)
    (注意
    <>
    )。这也将消除在
    Runnable
    方法中转换为
    PoolThreadRunnable#run()
    的需要。

  2. 如果无法添加元素,例如因为队列已满,则
  3. offer(E)

    方法将返回

    false
    。您不检查此项,这意味着
    execute
    的调用者可能会认为任务已成功排队,尽管事实并非如此。如果
    offer
    返回
    false
    ,您可能应该抛出异常。在
    Java Executor Framework
    中,它将是一个 RejectedExecutionException
    
    

© www.soinside.com 2019 - 2024. All rights reserved.