我有一个自定义的ExecutorService,其中包含ScheduledExecutorService,可用于中断提交给ExecutorSerice的任务,如果它们花费的时间太长,请将complet类放在此文章的末尾。
这很好,只是有时中断本身会引起问题,所以我改为在新的CanceableTask类中添加了一个易失的boolean cancel标志,并使它们成为该类的子类,以便他们可以检查并干净地停止自己如果它们的布尔值已发送为true。请注意,它们是提交给执行程序服务precisley的每个类中的boolean实例,因此可以取消长时间运行的任务而无需取消其他任务。
但是FutureTask作为参数传递给beforeExecute(Thread t,Runnable r),并且这不能访问Callable类,因此我的超时代码无法设置取消标志。
我通过重写newTaskFor方法以返回仅提供对Callable的引用的类来解决此问题
public class FutureCallable<V> extends FutureTask<V>
{
private Callable<V> callable;
public FutureCallable(Callable<V> callable) {
super(callable);
this.callable = callable;
}
public Callable<V> getCallable() {
return callable;
}
}
并且一切都很好,或者我想。
[不幸的是,当新任务提交给ExecutorService并最终耗尽内存时,我的应用程序现在使用越来越多的内存,当我对应用程序进行配置时,我发现即使所有的FutureCallables都存在对所有FutureCallables的线程堆栈本地引用。任务已完成,并且因为FutureCallable引用了正在运行的类,所以它使用了大量内存。
[当我查看(FutureCallable扩展的)FutureTask的代码时,有一个私有的Callable引用的注释,上面写着
/** The underlying callable; nulled out after running */
因此,如何改善FutureCallable使其对Callable的引用无效?或为什么在任务完成后仍保留对FutureCallable的引用。
我已经确认,如果我注释掉newTaskFor方法,则不会占用过多的内存,但是不幸的是,那时我无法取消该类。
完整类是:
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private final static int WAIT_BEFORE_INTERRUPT = 10000;
private final static int WAIT_BEFORE_STOP = 10000;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
//Map Task to the Future of the Timeout Task that could be used to interrupt it
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public long getTimeout()
{
return timeout;
}
public TimeUnit getTimeoutUnit()
{
return timeoutUnit;
}
public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
{
super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
MainWindow.logger.severe("Init:"+workerSize+":Timeout:"+timeout+":"+timeoutUnit);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
return new FutureCallable<T>(callable);
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
//Schedule a task to interrupt the thread that is running the task after time timeout starting from now
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t, r), timeout, timeoutUnit);
//Add Mapping
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//AfterExecute will be called after the task has completed, either of its own accord or because it
//took too long and was interrupted by corresponding timeout task
//Remove mapping and cancel timeout task
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
@Override
protected void terminated()
{
//All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
MainWindow.logger.severe("---Shutdown TimeoutExecutor");
timeoutExecutor.shutdown();
}
/**
* Interrupt or possibly stop the thread
*
*/
class TimeoutTask implements Runnable {
private final Thread thread;
private Callable c;
public TimeoutTask(Thread thread, Runnable c) {
this.thread = thread;
if(c instanceof FutureCallable)
{
this.c = ((FutureCallable) c).getCallable();
}
}
@Override
public void run()
{
String msg = "";
if (c != null)
{
if (c != null && c instanceof CancelableTask)
{
MainWindow.logger.severe("+++Cancelling " + msg + " task because taking too long");
((CancelableTask) c).setCancelTask(true);
}
}
}
}
}
public abstract class CancelableTask extends ExecutorServiceEnabledAnalyser
{
private volatile boolean cancelTask = false;
public boolean isCancelTask() {
return cancelTask;
}
public void setCancelTask(boolean cancelTask) {
this.cancelTask = cancelTask;
}
CancelableTask(final MainWindow start, boolean isSelectedRecords, boolean isUseRowSelection)
{
super(start, isSelectedRecords, isUseRowSelection);
}
CancelableTask(final MainWindow start, List<MetadataChangedWrapper> songs)
{
super(start, songs );
}
}
ThreadLocal
在哪里?我发现您在说什么很奇怪,很难相信,它始终引用曾经运行过的所有任务,即使完成后也是如此。如果是这种情况,即使没有您的覆盖,它最终也应该耗尽内存(任务本身使用了一些内存,尽管可能少于您的可调用内存,但仍不是零)。
无论如何,执行后,您可以覆盖done
上的FutureCallable
方法以使包装的对象无效。