在这种情况下,为什么使用新池的内部并行流比使用 commonPool 更快?

问题描述 投票:0回答:1

所以我最近运行了一个基准测试,比较了 3 种情况下嵌套流的性能:

  • 并行外流和顺序内流
  • 并行的外部和内部流(使用
    parallelStream
    )-这有效地测试`ForkJoinPool.commonPool()
  • 并行的外部和内部流,但内部流为每个任务创建新的
    ForkJoinPool

这是基准代码(我使用过 JMH):

public class NestedPerf {
  @State(Scope.Benchmark)
  public static class StateData{
    public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
    public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
  }
  private static void runInNewPool(Runnable task) {
    ForkJoinPool pool = new ForkJoinPool();
    try {
      pool.submit(task).join();
    } finally {
      pool.shutdown();
    }
  }
  private static void innerParallelLoop() {
    StateData.innerLoop.parallelStream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  private static void innerSequentialLoop() {
    StateData.innerLoop.stream().unordered().forEach(i -> {
      try {
        Thread.sleep(5);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    });
  }
  @Benchmark
  public void testingNewPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      runInNewPool(ParallelPerf::innerParallelLoop);
      bh.consume(i);
    });
  }

  @Benchmark
  public void testingCommonPoolWithSequentialInner(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerSequentialLoop();
      bh.consume(i);
    });
  }
  @Benchmark
  public void testingCommonPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerParallelLoop();
      bh.consume(i);
    });
  }
}

这是输出:

Benchmark                                         Mode  Cnt   Score   Error  Units
NestedPerf.testingCommonPool                     thrpt   25   1.935 ± 0.005  ops/s
NestedPerf.testingCommonPoolWithSequentialInner  thrpt   25   1.744 ± 0.007  ops/s
NestedPerf.testingNewPool                        thrpt   25  22.648 ± 0.559  ops/s

使用 new Pools 的方法与使用 commonPool 的方法之间的差异令人惊讶。有谁知道为什么创建新池可以使该基准测试速度提高 20 倍左右?

如果有帮助,我将在具有 12 个可用 CPU(十六核 + 超线程)的 Core i7 10850H 系统上运行此程序。

java performance java-stream benchmarking
1个回答
0
投票

您观察到的性能差异来自于

ForkJoinPool.commonPool()
处理嵌套并行流的方式。当外部流和内部流都使用
parallelStream()
而没有自定义线程池时,它们会竞争公共池中相同的有限线程集。这会导致线程争用和 CPU 资源利用率不足,因为池无法有效管理嵌套并行性。

通过为每个内部流创建一个新的

ForkJoinPool
,您可以为内部任务提供专用线程,从而避免与外部流的线程争用。这允许两个级别的并行性充分利用 CPU 内核。显然,正如您所注意到的,尽管创建新池的开销很大,但它会带来显着的性能提升。

可能的改进:

您可以通过为所有内部流使用共享的自定义

ForkJoinPool
来避免创建多个池的开销。这种方法消除了池创建开销,同时仍然为内部并行性提供单独的线程,从而带来更好的性能。

public class NestedPerf {
  private static final ForkJoinPool innerPool = new ForkJoinPool();

  @Benchmark
  public void testingSharedInnerPool(Blackhole bh){
    StateData.outerLoop.parallelStream().unordered().forEach(i -> {
      innerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
        try {
          Thread.sleep(5);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
        }
      })).join();
      bh.consume(i);
    });
  }
}

或者,您可以通过将外部循环和内部循环合并到一个并行流中,将并行性“展平”到单个级别。此方法可以有效地利用并行流,而无需嵌套并行性,通常会产生最快的执行时间,因为它最大化了 CPU 利用率并最小化了开销。 @Benchmark public void testingFlattenedStream(Blackhole bh){ StateData.outerLoop.parallelStream() .flatMap(i -> StateData.innerLoop.stream()) .unordered() .forEach(j -> { try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } bh.consume(j); }); }

如果您追求简单性和速度,那么扁平化可能会胜出。

© www.soinside.com 2019 - 2024. All rights reserved.