我需要运行五个线程来每隔20秒重复从API获取数据,所以我使用了ScheduledExecutorService。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 5; i++) {
scheduler.scheduleWithFixedDelay(Constant.workerThread[i], 0, delay, TimeUnit.SECONDS);
}
我怎么知道(每次)执行五个线程的时间?
编辑:似乎人们并没有真正理解代码片段。我会把它变得大胆,以便没有人再次来到我身边,在外部管理内部ExecutorService
,而不是在Callable
lambda内部,在需要时采取适当的谨慎措施有序地关闭它。
您可以做的是管理单个计划任务,在该计划任务内部执行您的五个工作程序。
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
scheduler.scheduleWithFixedDelay(
() -> {
final ExecutorService e = Executors.newFixedThreadPool(5);
final CompletionService<?> cs = new ExecutorCompletionService<>(e);
for (int i = 0; i < 5; i++) {
// Constant.workerThread[i] is a Runnable
cs.submit(Constant.workerThread[i], null);
}
for (int i = 0; i < 5; i++) {
try {
// Will block until a Future<?> result is available.
// Doesn't matter which one, it will take the first available.
cs.take();
} catch (final InterruptedException ignored) {
// Ingore the exception, as we care only
// about if all off them finished (abruptly or not)
}
}
// All of them finished!
e.shutdown();
}, 0, 20, TimeUnit.SECONDS));
用于ExecutorCompletionService
的JavaDoc
使用提供的
CompletionService
执行任务的Executor
。该类安排提交的任务在完成后放置在可使用take
访问的队列中。
用于ExecutorCompletionService#take
的JavaDoc
检索并删除代表下一个已完成任务的
Future
,等待(如果还没有)任务。
这必须改进,但你应该明白。
Runnable
objects, not threads线程不是“执行”的。
你应该在打电话给Runnable
时传递ScheduledExecutorService::scheduleAtFixdDelay
。我担心你的命名Constant.workerThread
。你不传递线程,你传递任务在某个线程上运行。你不必担心哪个线程运行什么Runnable
任务。你根本不需要关心线程。处理要在线程上运行的任务是执行程序的工作,因此就是名称。
您似乎缺少线程和任务的基本概念。这并不奇怪,因为它在首次开始时是一个棘手的主题。我建议Oracle.com免费提供有关线程和执行程序的Java教程。然后进行一些互联网搜索以了解更多信息。最后你应该学习Brian Goetz等人的着作,Java Concurrency In Practice。
ScheduledFuture
tracks completion所以你不会监视线程。而是专注于你的Runnable
任务。要监控其状态,请通过调用ScheduledFuture
捕获scheduleAtFixedDelay
对象返回。目前,您忽略了那些返回的对象。
该计划的未来对象提供了查看任务是完成还是已取消的方法。您也可以取消该任务。
如果您可以更改计划任务的源代码,则可以实现以下内容:
public class ScheduleExample {
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
static abstract class RunnableWithNotification implements Runnable {
@Override
public final void run() {
beforeRun();
doRun();
afterRun();
}
public abstract void doRun();
public abstract void beforeRun();
public abstract void afterRun();
}
public static void main(String... args) {
long delay = 5;
List<Runnable> tasks = Arrays.asList(
newRunnableWithNotification(1),
newRunnableWithNotification(2),
newRunnableWithNotification(3),
newRunnableWithNotification(4),
newRunnableWithNotification(5));
tasks.forEach(task -> scheduler.scheduleWithFixedDelay(task, 0, delay, TimeUnit.SECONDS));
}
private static Runnable newRunnableWithNotification(int i) {
return new RunnableWithNotification() {
@Override
public void doRun() {
System.out.println("Executing task " + i);
}
@Override
public void beforeRun() {
System.out.println("Before executing task " + i);
}
@Override
public void afterRun() {
System.out.println("After executed task " + i);
}
};
}
}
您可以设置一种执行映射。 始终关于每个可运行的执行状态的新数据。这是一个通用示例,因此您需要根据自己的需要进行调整。
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Application {
// Your mocked runnables
private static List<Runnable> workerRunnables = new ArrayList<>();
// This will be the map with always updated values, get the map at[i]
// will return if workerThread[i is running]
private static Map<Integer, Boolean> executionMap = new ConcurrentHashMap<>();
private static final int threadPoolSize = 5;
public static void main(String[] args) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(threadPoolSize);
for (int i = 0; i < threadPoolSize; i++) {
int finalI = i;
workerRunnables.add(() -> {
try {
// Update the map, the runnable has started
executionMap.put(finalI, true);
// Simulating your API calls with different types of delay
Thread.sleep(3000);
if (finalI == 2) {
Thread.sleep(1000);
}
// Update the map, the runnable has finished
executionMap.put(finalI, false);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
workerRunnables.forEach(worker -> scheduler.scheduleWithFixedDelay(worker, 0, 2, TimeUnit.SECONDS));
Executors.newCachedThreadPool().execute(new Runnable() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Override
public void run() {
scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
for (int i = 0; i < threadPoolSize; i++) {
System.out.println("Runnable number " + i +" is running: " + executionMap.get(i));
}
}
}, 0, 2, TimeUnit.SECONDS);
}
});
}
}