我想创建一个ThreadPoolExecutor
,当它达到最大大小并且队列已满时,submit()
方法在尝试添加新任务时会阻塞。我是否需要为此实现自定义RejectedExecutionHandler
,或者是否存在使用标准Java库执行此操作的方法?
我刚刚发现的一种可能的解决方案:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
还有其他解决方案吗?我更喜欢基于RejectedExecutionHandler
的东西,因为它似乎是处理这种情况的标准方法。
创建自己的阻塞队列以供Executor使用,具有您正在寻找的阻塞行为,同时始终返回可用的剩余容量(确保执行程序不会尝试创建比其核心池更多的线程,或触发拒绝处理程序)。
我相信这会让你获得正在寻找的阻止行为。拒绝处理程序永远不适合账单,因为这表明执行程序无法执行任务。我可以想象的是,你在处理程序中得到某种形式的“忙碌等待”。这不是你想要的,你想要一个阻止调用者的执行程序的队列......
您应该看看this link (notifying-blocking-thread-pool),它总结了几个解决方案,最后给出了一个优雅的通知。
避免@FixPoint解决方案出现问题。可以使用ListeningExecutorService并在FutureCallback中释放信号量onSuccess和onFailure。
最近我发现这个问题有同样的问题。 OP没有明确说明,但是我们不想使用在提交者线程上执行任务的RejectedExecutionHandler
,因为如果这个任务是一个长期运行的任务,这将不足利用工作线程。
阅读所有答案和评论,特别是使用信号量或使用afterExecute
的有缺陷的解决方案我仔细查看了ThreadPoolExecutor的代码,看看是否有一些出路。令我惊讶的是,有超过2000行(注释)代码,其中一些让我觉得dizzy。鉴于我实际上有一个相当简单的要求 - 一个生产者,几个消费者,让生产者阻止没有消费者可以工作 - 我决定推出自己的解决方案。它不是ExecutorService
而只是Executor
。并且它不会使线程数量适应工作负载,但仅保留固定数量的线程,这也符合我的要求。这是代码。随意咆哮:-)
package x;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
/**
* distributes {@code Runnable}s to a fixed number of threads. To keep the
* code lean, this is not an {@code ExecutorService}. In particular there is
* only very simple support to shut this executor down.
*/
public class ParallelExecutor implements Executor {
// other bounded queues work as well and are useful to buffer peak loads
private final BlockingQueue<Runnable> workQueue =
new SynchronousQueue<Runnable>();
private final Thread[] threads;
/*+**********************************************************************/
/**
* creates the requested number of threads and starts them to wait for
* incoming work
*/
public ParallelExecutor(int numThreads) {
this.threads = new Thread[numThreads];
for(int i=0; i<numThreads; i++) {
// could reuse the same Runner all over, but keep it simple
Thread t = new Thread(new Runner());
this.threads[i] = t;
t.start();
}
}
/*+**********************************************************************/
/**
* returns immediately without waiting for the task to be finished, but may
* block if all worker threads are busy.
*
* @throws RejectedExecutionException if we got interrupted while waiting
* for a free worker
*/
@Override
public void execute(Runnable task) {
try {
workQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("interrupt while waiting for a free "
+ "worker.", e);
}
}
/*+**********************************************************************/
/**
* Interrupts all workers and joins them. Tasks susceptible to an interrupt
* will preempt their work. Blocks until the last thread surrendered.
*/
public void interruptAndJoinAll() throws InterruptedException {
for(Thread t : threads) {
t.interrupt();
}
for(Thread t : threads) {
t.join();
}
}
/*+**********************************************************************/
private final class Runner implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
Runnable task;
try {
task = workQueue.take();
} catch (InterruptedException e) {
// canonical handling despite exiting right away
Thread.currentThread().interrupt();
return;
}
try {
task.run();
} catch (RuntimeException e) {
// production code to use a logging framework
e.printStackTrace();
}
}
}
}
}
我相信通过使用java.util.concurrent.Semaphore
和委托Executor.newFixedThreadPool
的行为来解决这个问题有相当优雅的方法。新的执行程序服务只有在有线程执行时才会执行新任务。阻塞由Semaphore管理,许可数等于线程数。任务完成后,返回许可。
public class FixedThreadBlockingExecutorService extends AbstractExecutorService {
private final ExecutorService executor;
private final Semaphore blockExecution;
public FixedThreadBlockingExecutorService(int nTreads) {
this.executor = Executors.newFixedThreadPool(nTreads);
blockExecution = new Semaphore(nTreads);
}
@Override
public void shutdown() {
executor.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}
@Override
public boolean isShutdown() {
return executor.isShutdown();
}
@Override
public boolean isTerminated() {
return executor.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
blockExecution.acquireUninterruptibly();
executor.execute(() -> {
try {
command.run();
} finally {
blockExecution.release();
}
});
}
我过去也有同样的需求:一种由共享线程池支持的每个客户端具有固定大小的阻塞队列。我最终编写了自己的ThreadPoolExecutor:
UserThreadPoolExecutor(阻塞队列(每个客户端)+线程池(在所有客户端之间共享))
见:https://github.com/d4rxh4wx/UserThreadPoolExecutor
每个UserThreadPoolExecutor都从共享的ThreadPoolExecutor获得最大线程数
每个用户ThreadPoolExecutor都可以:
我在弹性搜索客户端找到了这个拒绝政策。它阻塞阻塞队列上的调用者线程。代码如下 -
static class ForceQueuePolicy implements XRejectedExecutionHandler
{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try
{
executor.getQueue().put(r);
}
catch (InterruptedException e)
{
//should never happen since we never wait
throw new EsRejectedExecutionException(e);
}
}
@Override
public long rejected()
{
return 0;
}
}
我最近需要实现类似的东西,但是在ScheduledExecutorService
上。
我还必须确保我处理在方法上传递的延迟,并确保任务被提交以执行,因为调用者期望或者只是失败因此抛出RejectedExecutionException
。
ScheduledThreadPoolExecutor
执行或提交任务的其他方法在内部调用#schedule
,它仍然会调用覆盖的方法。
import java.util.concurrent.*;
public class BlockingScheduler extends ScheduledThreadPoolExecutor {
private final Semaphore maxQueueSize;
public BlockingScheduler(int corePoolSize,
ThreadFactory threadFactory,
int maxQueueSize) {
super(corePoolSize, threadFactory, new AbortPolicy());
this.maxQueueSize = new Semaphore(maxQueueSize);
}
@Override
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(delay));
return super.schedule(command, newDelayInMs, TimeUnit.MILLISECONDS);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(callable, unit.toMillis(delay));
return super.schedule(callable, newDelayInMs, TimeUnit.MILLISECONDS);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
return super.scheduleAtFixedRate(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
final long newDelayInMs = beforeSchedule(command, unit.toMillis(initialDelay));
return super.scheduleWithFixedDelay(command, newDelayInMs, unit.toMillis(period), TimeUnit.MILLISECONDS);
}
@Override
protected void afterExecute(Runnable runnable, Throwable t) {
super.afterExecute(runnable, t);
try {
if (t == null && runnable instanceof Future<?>) {
try {
((Future<?>) runnable).get();
} catch (CancellationException | ExecutionException e) {
t = e;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null) {
System.err.println(t);
}
} finally {
releaseQueueUsage();
}
}
private long beforeSchedule(Runnable runnable, long delay) {
try {
return getQueuePermitAndModifiedDelay(delay);
} catch (InterruptedException e) {
getRejectedExecutionHandler().rejectedExecution(runnable, this);
return 0;
}
}
private long beforeSchedule(Callable callable, long delay) {
try {
return getQueuePermitAndModifiedDelay(delay);
} catch (InterruptedException e) {
getRejectedExecutionHandler().rejectedExecution(new FutureTask(callable), this);
return 0;
}
}
private long getQueuePermitAndModifiedDelay(long delay) throws InterruptedException {
final long beforeAcquireTimeStamp = System.currentTimeMillis();
maxQueueSize.tryAcquire(delay, TimeUnit.MILLISECONDS);
final long afterAcquireTimeStamp = System.currentTimeMillis();
return afterAcquireTimeStamp - beforeAcquireTimeStamp;
}
private void releaseQueueUsage() {
maxQueueSize.release();
}
}
我在这里有代码,会感谢任何反馈。 https://github.com/AmitabhAwasthi/BlockingScheduler
这个解决方案看起来效果非常好。它被称为NotifyingBlockingThreadPoolExecutor。
编辑:这个代码有一个issue,await()方法是错误的。调用shutdown()+ awaitTermination()似乎工作正常。
我并不总是喜欢CallerRunsPolicy,特别是因为它允许被拒绝的任务“跳过队列”并在之前提交的任务之前执行。此外,在调用线程上执行任务可能比等待第一个插槽变得可用花费更长的时间。
我使用自定义的RejectedExecutionHandler解决了这个问题,它只是暂时阻塞调用线程,然后再次尝试提交任务:
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}
executor.execute(r);
}
}
这个类可以像在任何其他类似的RejectedExecutinHandler中一样在线程池执行器中使用,例如:
executorPool = new ThreadPoolExecutor(1, 1, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new BlockWhenQueueFull());
我看到的唯一缺点是调用线程可能会被锁定比严格必要的时间长(最长250毫秒)。此外,由于这个执行器被有效地递归调用,因此很长时间等待线程可用(小时)可能导致堆栈溢出。
不过,我个人喜欢这种方法。它结构紧凑,易于理解,运行良好。
您可以使用ThreadPoolExecutor和blockingQueue:
public class ImageManager {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread,
0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
private int downloadThumbnail(String fileListPath){
executorService.submit(new yourRunnable());
}
}
您应该使用CallerRunsPolicy
,它在调用线程中执行被拒绝的任务。这样,在该任务完成之前,它不能向执行程序提交任何新任务,此时将有一些空闲池线程或该进程将重复。
来自文档:
被拒绝的任务
当Executor关闭时,以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时,将拒绝方法execute(java.lang.Runnable)中提交的新任务。在任何一种情况下,execute方法都会调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,java.util.concurrent.ThreadPoolExecutor)方法。提供了四种预定义的处理程序策
- 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException。
- 在ThreadPoolExecutor.CallerRunsPolicy中,调用execute本身的线程运行该任务。这提供了一种简单的反馈控制机制,可以降低新任务的提交速度。
- 在ThreadPoolExecutor.DiscardPolicy中,简单地删除了无法执行的任务。
- 在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序未关闭,则会删除工作队列头部的任务,然后重试执行(可能会再次失败,导致重复执行此操作)。
此外,在调用ThreadPoolExecutor
构造函数时,请确保使用有界队列,例如ArrayBlockingQueue。否则,什么都不会被拒绝。
编辑:响应您的注释,将ArrayBlockingQueue的大小设置为等于线程池的最大大小,并使用AbortPolicy。
编辑2:好的,我知道你得到了什么。那怎么样:覆盖beforeExecute()
方法来检查getActiveCount()
是否不超过getMaximumPoolSize()
,如果确实如此,请睡觉再试一次?
Hibernate有一个简单的BlockPolicy
,可以做你想要的:
/**
* A handler for rejected tasks that will have the caller block until
* space is available.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>BlockPolicy</tt>.
*/
public BlockPolicy() { }
/**
* Puts the Runnable to the blocking queue, effectively blocking
* the delegating thread until space is available.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
}
catch (InterruptedException e1) {
log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
}
}
}
如果对Executor使用无界队列,或者信号量绑定不大于队列大小,那么上面引用的Java Concurrency in Practice中引用的BoundedExecutor
答案只能正常工作。信号量是提交线程和池中线程之间共享的状态,即使队列大小<bound <=(队列大小+池大小),也可以使执行器饱和。
使用CallerRunsPolicy
只有在你的任务不能永久运行时才有效,在这种情况下你的提交线程将永远保留在rejectedExecution
中,如果你的任务需要很长时间才能运行,这是一个坏主意,因为提交线程无法提交任何新的任务或其他任何事情,如果它自己运行任务。
如果这是不可接受的,那么我建议在提交任务之前检查执行程序的有界队列的大小。如果队列已满,请等待一小段时间再尝试再次提交。吞吐量会受到影响,但我认为它比许多其他提议的解决方案更简单,你保证不会有任何任务被拒绝。
我知道,这是一个黑客,但在我看来这里提供的最干净的黑客;-)
因为ThreadPoolExecutor使用阻塞队列“offer”而不是“put”,所以让我们覆盖阻塞队列的“offer”行为:
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {
BlockingQueueHack(int size) {
super(size);
}
public boolean offer(T task) {
try {
this.put(task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}
ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
我测试了它似乎工作。实施一些超时策略留给读者练习。
以下类包装ThreadPoolExecutor并使用信号量阻止工作队列已满:
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
这个包装类基于Brian Goetz在Java Concurrency in Practice一书中给出的解决方案。本书中的解决方案只需要两个构造函数参数:Executor
和用于信号量的边界。这在Fixpoint给出的答案中显示。这种方法存在一个问题:它可能处于池线程繁忙,队列已满的状态,但信号量刚刚释放了许可证。 (终于阻止的semaphore.release()
)。在此状态下,新任务可以获取刚刚释放的许可,但由于任务队列已满而被拒绝。当然这不是你想要的;你想在这种情况下阻止。
要解决这个问题,我们必须使用一个无界的队列,正如JCiP明确提到的那样。信号量充当保护,产生虚拟队列大小的效果。这具有副作用,即该单元可能包含maxPoolSize + virtualQueueSize + maxPoolSize
任务。这是为什么?因为在finally块中的semaphore.release()
。如果所有池线程同时调用此语句,则释放maxPoolSize
许可,允许相同数量的任务进入该单元。如果我们使用有界队列,它仍然会满,导致被拒绝的任务。现在,因为我们知道这只发生在池线程几乎完成时,这不是问题。我们知道池线程不会阻塞,因此很快就会从队列中获取任务。
您可以使用有界队列。只要确保它的大小等于virtualQueueSize + maxPoolSize
。更大的尺寸是无用的,信号量将阻止更多的物品进入。较小的尺寸将导致被拒绝的任务。任务被拒绝的机会随着大小的减少而增加。例如,假设您想要一个maxPoolSize = 2且virtualQueueSize = 5的有界执行程序。然后获取一个信号量,其中5 + 2 = 7个许可证,实际队列大小为5 + 2 = 7。然后可以在单元中的实际任务数为2 + 5 + 2 = 9。当执行程序已满(队列中的5个任务,线程池中的2个任务,因此0允许可用)并且所有池线程释放其许可时,进入的任务可以准确地获得2个许可。
现在来自JCiP的解决方案使用起来有点麻烦,因为它没有强制执行所有这些约束(无界限队列,或者受这些数学限制等限制)。我认为这只是一个很好的例子来演示如何基于已经可用的部分构建新的线程安全类,而不是作为一个完整的,可重用的类。我不认为后者是作者的意图。
您可以使用这样的自定义RejectedExecutionHandler
ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
max_handlers, // max size
timeout_in_seconds, // idle timeout
TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// This will block if the queue is full
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}
});