我有一个
Runnable
任务列表(例如,它包含 100 个任务,每个任务随机花费 1 - 10 秒)。 任务必须并行运行并且来自ExecutorService
提供的线程池(例如我的系统有4个CPU,那么它应该同时运行4个任务)。
问题是:我想知道 100 个任务列表中哪些任务运行时间超过 5 秒,并且它们应该在 5 秒后终止(带有任务 ID 日志),以便为其他任务腾出位置。
我已经用Future
查看了
executorService.submit(Runnable task)
,但是
Future.get()
方法会阻塞主线程,这不是我想要的。任何建议都会很棒。
public class TestExecutorService {
private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 10);
public static void main(String[] args) throws InterruptedException {
List<Callable<Object>> tasks = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
int min = 1;
int max = 8;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
int sleepTime = min + (int)(Math.random() * ((max - min) + 1));
System.out.println("## Thread: " + finalI + " will sleep: " + sleepTime + " seconds.");
Thread.sleep(sleepTime * 1000);
System.out.println("## Thread: " + finalI + " finished after: " + sleepTime + " seconds");
} catch (InterruptedException e) {
System.out.println("Thread is cancelled!");
}
}
};
tasks.add(Executors.callable(runnable));
}
// How to make a Runnable task timeout after 5 seconds when running other tasks in parallel
// instead of total time for 100 tasks in 5 seconds?
executorService.invokeAll(tasks, 5, TimeUnit.SECONDS);
executorService.shutdown();
}
}
List<Runnable> tasks = ...;
ExecutorService executor = ...;
现在您想要执行每个任务,获取其花费的时间,如果花费太长时间则取消该任务。我建议安排一个超时操作。
ScheduledExecutorService timeoutService = Executors.newSingleThreadScheduledExecutor();
现在当您提交任务时。
List<Future<Long>> results = new ArrayList<>();
for(int i = 0; i<tasks.size(); i++){
Runnable task = tasks.get(i);
Future<Long> future = executor.submit( () ->{
long start = System.currentTimeMillis();
task.run();
return System.currentTimeMillis() - start;
});
Future<?> timeout = timeoutService.schedule( ()->{
if(!future.isDone()){
future.cancel(true);
}
}, 5, TimeUnit.SECONDS);
results.add(future);
}
现在您只需浏览 results
并在所有任务完成后调用
get
,无论是异常还是正常,您都会完成结果列表的浏览。这假设您的任务可以取消或中断如果不能,那么您可以使用超时期货。
ExecutorService
单线程,它将在每个
Runnable
任务中调用。它有自己的超时,因此不会干扰线程池的其他任务。完整代码如下:
package com.company;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class TestThreads {
private static final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
public static void main(String[] args) throws InterruptedException {
List<Callable<Object>> tasks = new ArrayList<>();
List<Runnable> runnables = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
int min = 1;
int max = 20;
Runnable runnable = new Runnable() {
@Override
public void run() {
String threadName = "### Thread: " + finalI;
long maxTime = 5000;
ExecutorService executorServiceTmp = Executors.newSingleThreadExecutor();
Callable<Object> callable = () -> {
int sleepTime = min + (int) (Math.random() * ((max - min) + 1));
System.out.println("## Thread: " + finalI + " will sleep: " + sleepTime + " seconds.");
Thread.sleep(sleepTime * 1000);
System.out.println("## Thread: " + finalI + " finished after: " + sleepTime + " seconds");
return null;
};
long startTime = System.currentTimeMillis();
try {
executorServiceTmp.invokeAll(Arrays.asList(callable), maxTime, TimeUnit.MILLISECONDS);
} catch (Exception e) {
System.out.println("Thread: " + threadName + " is cancelled.");
} finally {
executorServiceTmp.shutdown();
}
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
if (totalTime >= maxTime) {
System.out.println("(!) Thread: " + threadName + " is cancelled after " + maxTime + " ms");
}
}
};
tasks.add(Executors.callable(runnable));
runnables.add(runnable);
}
executorService.invokeAll(tasks);
System.out.println("### All threads fininshed");
executorService.shutdown();
}
}