所以我最近运行了一个基准测试,比较了 3 种情况下嵌套流的性能:
parallelStream
)-这有效地测试`ForkJoinPool.commonPool()ForkJoinPool
这是基准代码(我使用过 JMH):
public class NestedPerf {
@State(Scope.Benchmark)
public static class StateData{
public static final List<Integer> outerLoop = IntStream.range(0, 32).boxed().toList();
public static final List<Integer> innerLoop = IntStream.range(0, 32).boxed().toList();
}
private static void runInNewPool(Runnable task) {
ForkJoinPool pool = new ForkJoinPool();
try {
pool.submit(task).join();
} finally {
pool.shutdown();
}
}
private static void innerParallelLoop() {
StateData.innerLoop.parallelStream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private static void innerSequentialLoop() {
StateData.innerLoop.stream().unordered().forEach(i -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@Benchmark
public void testingNewPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
runInNewPool(ParallelPerf::innerParallelLoop);
bh.consume(i);
});
}
@Benchmark
public void testingCommonPoolWithSequentialInner(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerSequentialLoop();
bh.consume(i);
});
}
@Benchmark
public void testingCommonPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoop();
bh.consume(i);
});
}
}
这是输出:
Benchmark Mode Cnt Score Error Units
NestedPerf.testingCommonPool thrpt 25 1.935 ± 0.005 ops/s
NestedPerf.testingCommonPoolWithSequentialInner thrpt 25 1.744 ± 0.007 ops/s
NestedPerf.testingNewPool thrpt 25 22.648 ± 0.559 ops/s
使用 new Pools 的方法与使用 commonPool 的方法之间的差异令人惊讶。有谁知道为什么创建新池可以使该基准测试速度提高 20 倍左右?
如果有帮助,我将在具有 12 个可用 CPU(十六核 + 超线程)的 Core i7 10850H 系统上运行此程序。
您观察到的性能差异来自于
ForkJoinPool.commonPool()
处理嵌套并行流的方式。当外部流和内部流都使用 parallelStream()
而没有自定义线程池时,它们会竞争公共池中相同的有限线程集。这会导致线程争用和 CPU 资源利用率不足,因为池无法有效管理嵌套并行性。
通过为每个内部流创建一个新的
ForkJoinPool
,您可以为内部任务提供专用线程,从而避免与外部流的线程争用。这允许两个级别的并行性充分利用 CPU 内核。显然,正如您所注意到的,尽管创建新池的开销很大,但它会带来显着的性能提升。
可能的改进:
您可以通过为所有内部流使用共享的自定义
ForkJoinPool
来避免创建多个池的开销。这种方法消除了池创建开销,同时仍然为内部并行性提供单独的线程,从而带来更好的性能。
public class NestedPerf {
private static final ForkJoinPool innerPool = new ForkJoinPool();
@Benchmark
public void testingSharedInnerPool(Blackhole bh){
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
bh.consume(i);
});
}
}
或者,您可以通过将外部循环和内部循环合并到一个并行流中,将并行性“展平”到单个级别。此方法可以有效地利用并行流,而无需嵌套并行性,通常会产生最快的执行时间,因为它最大化了 CPU 利用率并最小化了开销。
@Benchmark
public void testingFlattenedStream(Blackhole bh){
StateData.outerLoop.parallelStream()
.flatMap(i -> StateData.innerLoop.stream())
.unordered()
.forEach(j -> {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
bh.consume(j);
});
}
如果您追求简单性和速度,那么扁平化可能会胜出。