通过Java中的Executor服务包装Stream.parallel()

问题描述 投票:0回答:1

我正在 Udemy 和 YouTube 的 DMDEV 课程中学习 Java Core。在并发课程的第 2 级中,我可能看到导师在代码中犯了错误。但我向他提出的问题仍然没有得到答复。所以也许这里的专家可以提供帮助。

任务:有一个包含 1_000_000 个元素的数组,用 1 到 300 范围内的随机 int 填充。用 Java 编写代码,使用 10 个线程找到最大元素。

老师提出了这个解决方案:

public class TaskFromDmdevForSOF {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int[] values = new int[1_000_000];
        Random random = new Random();
        for (int i = 0; i < values.length; i++) {
            values[i] = random.nextInt(300) + 1;
        }

        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        int max = findMaxParallel(values, threadPool);
        System.out.println(max);
        threadPool.shutdown();
        threadPool.awaitTermination(1, TimeUnit.MINUTES);
    }
    private static int findMaxParallel(int[] values, ExecutorService executorService) throws ExecutionException, InterruptedException {
        return executorService.submit(() -> IntStream.of(values)
                .parallel()
                .max()
                .orElse(Integer.MIN_VALUE)).get();
    }
}

但我很好奇,这个任务下真的有 10 个线程在工作吗? 我在 .parallel() 之后添加了 .peek() ,发现有一个常见的 ForkJoinPool,有 4 个线程在后台工作。 在我看来,我们需要使用 new ForkJoinPool(10) 而不是 Executors.newFixedThreadPool(10) 来保证有真正的 10 个线程在工作。 或者更好的是,我们需要通过扩展 RecursiveTask 来实现我们自己的类来正确解决此任务。 我说得对吗?非常感谢您的解答。

java stream threadpoolexecutor executor forkjoinpool
1个回答
0
投票

你说得对。您问题中的代码运行未指定数量的线程。

正确答案应该是这样的:

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int[] values = new int[1_000_000];
        Random random = new Random();
        for (int i = 0; i < values.length; i++) {
            values[i] = random.nextInt(300) + 1;
        }

        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        int step = 1_000_000 / 10;
        int max = IntStream.range(0, 10).mapToObj(i ->
                threadPool.submit(() -> maxValue(values, i * step, (i + 1) * step)))
            .toList()
            .stream()
            .mapToInt(x -> {
                try {
                    return x.get();
                } catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            })
            .max()
            .orElseThrow();

        System.out.println(max);
        threadPool.shutdown();
    }

    private static int maxValue(int[] values, int start, int end) {
        return Arrays.stream(values, start, end).max().orElseThrow();
    }
}

或者像这样:

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int[] values = new int[1_000_000];
        Random random = new Random();
        for (int i = 0; i < values.length; i++) {
            values[i] = random.nextInt(300) + 1;
        }

        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        Integer max = threadPool.submit(() ->
            threadPool.submit(() ->
                threadPool.submit(() ->
                    threadPool.submit(() ->
                        threadPool.submit(() ->
                            threadPool.submit(() ->
                                threadPool.submit(() ->
                                    threadPool.submit(() ->
                                        threadPool.submit(() ->
                                            threadPool.submit(() ->
                                                Arrays.stream(values).max().orElseThrow()
                                            )
                                        )
                                    )
                                )
                            )
                        )
                    )
                )
            )
        ).get().get().get().get().get().get().get().get().get().get();
        System.out.println(max);
        threadPool.shutdown();
    }
}

如果你今天感觉特别恶毒。

© www.soinside.com 2019 - 2024. All rights reserved.