Java 17 并行流没有产生更好的性能

问题描述 投票:0回答:1

我正在使用并行流来处理具有 20,000 个元素的流,并且对于每个元素调用一个方法,该方法使用 java readLock 进行一些计算。

当使用 JMH 运行代码并将串行版本与并行版本进行比较时,没有获得更好的性能,并且 JMH 数字有些相同。

公共类 ParallelEvaluateTest {

@Test
public void benchmark() throws Exception {
    System.out.println("procssors: " + Runtime.getRuntime().availableProcessors());
    Options opt = new OptionsBuilder()
            .include(this.getClass().getSimpleName())
            .forks(0)
            .measurementIterations(1)
            .warmupIterations(1)
            .build();


    new Runner(opt).run();
}

/*
Run total 5 iterations, first 2 as warmup and start benchmarking from next 3.

 */
@Benchmark
@Fork(value = 3, warmups = 2)
@BenchmarkMode({Mode.Throughput, Mode.AverageTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Test
public void evaluateParallelTest() {
    final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
    final List<Map<String, String>> ruleInputList = createTestDataInput();

    assertEquals(ruleInputList.size(), 20000);

    ruleInputList.parallelStream().forEach(ruleInput -> {
                resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
            }
    );
    validateTestOutput(resultMap);
}

@Benchmark
@Fork(value = 3, warmups = 2)
@BenchmarkMode({Mode.Throughput, Mode.AverageTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Test
public void evaluateSerialTest() {
    final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
    List<Map<String, String>> ruleInputList = createTestDataInput();

    assertEquals(ruleInputList.size(), 20000);

    ruleInputList.stream().forEach(ruleInput -> {
                resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
            }
    );
    validateTestOutput(resultMap);
}

private static  List<Map<String, String>> createTestDataInput() {
    final List<Map<String, String>> ruleInputList = new ArrayList<>();
    Map<String, String> inputMap = new HashMap<>();

    for (int i =1; i <= 20000; i++) {
        inputMap = new HashMap<>();
        inputMap.put("input_1", String.valueOf(i));
        inputMap.put("input_2", String.valueOf(i));
        ruleInputList.add(inputMap);
    }
    return ruleInputList;
}

private void validateTestOutput(ConcurrentHashMap<Map<String, String>, String> resultMap) {
    assertEquals(resultMap.size(), 20000);
    resultMap.entrySet().removeIf(entry -> entry.getValue().equals(""));
    assertEquals(resultMap.size(), 7);
}

}

我还设置了VM参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=4 这等于机器中可用处理器的数量。

尽管如此,我得到的结果是:

  Benchmark                                   Mode  Cnt    Score   Error   Units
ParallelEvaluateTest.evaluateParallelTest  thrpt         0.001          ops/ms
ParallelEvaluateTest.evaluateSerialTest    thrpt         0.001          ops/ms
ParallelEvaluateTest.evaluateParallelTest   avgt       790.351           ms/op
ParallelEvaluateTest.evaluateSerialTest     avgt       799.799           ms/op

因此,简而言之,与串行流相比,并行流并没有产生更好的性能。 并行流代码在这里创建:

  ruleInputList.parallelStream().forEach(ruleInput -> {
                resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
            }
    );
java concurrency parallel-processing stream jmh
1个回答
0
投票

如果您想了解并行性如何改善运行时间,那么您应该只测量并行运行的部分,数据生成和验证应该分开(我在评论中提到过这一点 - 请参阅Amdahl's)。单独来看,20k 不足以看到任何显着差异。看看下面的代码:

import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;

public class Test {

    public static void main(String [] args) throws Exception {
        System.out.println("procssors: " + Runtime.getRuntime().availableProcessors());
        List<Map<String, String>> data = createTestDataInput();
        evaluate(data, true);
        evaluate(data, false);

        long start = System.currentTimeMillis();
        evaluate(data, true);
        long pend = System.currentTimeMillis();
        evaluate(data, false);
        long send = System.currentTimeMillis();

        System.err.println("Parallel: " + (pend - start) + "ms");
        System.err.println("Sequencial: " + (send - pend) + "ms");

    }   

    public static void evaluate(List<Map<String, String>> ruleInputList, boolean p) {
        final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
        if(p) {
            ruleInputList.parallelStream().forEach(ruleInput -> {
                        resultMap.put(ruleInput, "a");
                    }); 
        } else {
            ruleInputList.stream().forEach(ruleInput -> {
                        resultMap.put(ruleInput, "b");
                    }); 
        }   
    }   

    private static  List<Map<String, String>> createTestDataInput() {
        final List<Map<String, String>> ruleInputList = new ArrayList<>();
        Map<String, String> inputMap = new HashMap<>();

        for (int i =1; i <= 2000000; i++) {
            inputMap = new HashMap<>();
            inputMap.put("input_1", String.valueOf(i));
            inputMap.put("input_2", String.valueOf(i));
            ruleInputList.add(inputMap);
        }
        return ruleInputList;
    }
}


procssors: 8
Parallel: 302ms
Sequencial: 545ms
© www.soinside.com 2019 - 2024. All rights reserved.