我正在 Udemy 和 YouTube 的 DMDEV 课程中学习 Java Core。在并发课程的第 2 级中,我可能看到导师在代码中犯了错误。但我向他提出的问题仍然没有得到答复。所以也许这里的专家可以提供帮助。
任务:有一个包含 1_000_000 个元素的数组,用 1 到 300 范围内的随机 int 填充。用 Java 编写代码,使用 10 个线程找到最大元素。
老师提出了这个解决方案:
public class TaskFromDmdevForSOF {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] values = new int[1_000_000];
Random random = new Random();
for (int i = 0; i < values.length; i++) {
values[i] = random.nextInt(300) + 1;
}
ExecutorService threadPool = Executors.newFixedThreadPool(10);
int max = findMaxParallel(values, threadPool);
System.out.println(max);
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.MINUTES);
}
private static int findMaxParallel(int[] values, ExecutorService executorService) throws ExecutionException, InterruptedException {
return executorService.submit(() -> IntStream.of(values)
.parallel()
.max()
.orElse(Integer.MIN_VALUE)).get();
}
}
但我很好奇,这个任务下真的有 10 个线程在工作吗? 我在 .parallel() 之后添加了 .peek() ,发现有一个常见的 ForkJoinPool,有 4 个线程在后台工作。 在我看来,我们需要使用 new ForkJoinPool(10) 而不是 Executors.newFixedThreadPool(10) 来保证有真正的 10 个线程在工作。 或者更好的是,我们需要通过扩展 RecursiveTask 来实现我们自己的类来正确解决此任务。 我说得对吗?非常感谢您的解答。
你说得对。您问题中的代码运行未指定数量的线程。
正确答案应该是这样的:
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] values = new int[1_000_000];
Random random = new Random();
for (int i = 0; i < values.length; i++) {
values[i] = random.nextInt(300) + 1;
}
ExecutorService threadPool = Executors.newFixedThreadPool(10);
int step = 1_000_000 / 10;
int max = IntStream.range(0, 10).mapToObj(i ->
threadPool.submit(() -> maxValue(values, i * step, (i + 1) * step)))
.toList()
.stream()
.mapToInt(x -> {
try {
return x.get();
} catch (Throwable e) {
throw new RuntimeException(e);
}
})
.max()
.orElseThrow();
System.out.println(max);
threadPool.shutdown();
}
private static int maxValue(int[] values, int start, int end) {
return Arrays.stream(values, start, end).max().orElseThrow();
}
}
或者像这样:
public class Main {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] values = new int[1_000_000];
Random random = new Random();
for (int i = 0; i < values.length; i++) {
values[i] = random.nextInt(300) + 1;
}
ExecutorService threadPool = Executors.newFixedThreadPool(10);
Integer max = threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
threadPool.submit(() ->
Arrays.stream(values).max().orElseThrow()
)
)
)
)
)
)
)
)
)
).get().get().get().get().get().get().get().get().get().get();
System.out.println(max);
threadPool.shutdown();
}
}
如果你今天感觉特别恶毒。