我正在使用并行流来处理具有 20,000 个元素的流,并且对于每个元素调用一个方法,该方法使用 java readLock 进行一些计算。
当使用 JMH 运行代码并将串行版本与并行版本进行比较时,没有获得更好的性能,并且 JMH 数字有些相同。
公共类 ParallelEvaluateTest {
@Test
public void benchmark() throws Exception {
System.out.println("procssors: " + Runtime.getRuntime().availableProcessors());
Options opt = new OptionsBuilder()
.include(this.getClass().getSimpleName())
.forks(0)
.measurementIterations(1)
.warmupIterations(1)
.build();
new Runner(opt).run();
}
/*
Run total 5 iterations, first 2 as warmup and start benchmarking from next 3.
*/
@Benchmark
@Fork(value = 3, warmups = 2)
@BenchmarkMode({Mode.Throughput, Mode.AverageTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Test
public void evaluateParallelTest() {
final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
final List<Map<String, String>> ruleInputList = createTestDataInput();
assertEquals(ruleInputList.size(), 20000);
ruleInputList.parallelStream().forEach(ruleInput -> {
resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
}
);
validateTestOutput(resultMap);
}
@Benchmark
@Fork(value = 3, warmups = 2)
@BenchmarkMode({Mode.Throughput, Mode.AverageTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Test
public void evaluateSerialTest() {
final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
List<Map<String, String>> ruleInputList = createTestDataInput();
assertEquals(ruleInputList.size(), 20000);
ruleInputList.stream().forEach(ruleInput -> {
resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
}
);
validateTestOutput(resultMap);
}
private static List<Map<String, String>> createTestDataInput() {
final List<Map<String, String>> ruleInputList = new ArrayList<>();
Map<String, String> inputMap = new HashMap<>();
for (int i =1; i <= 20000; i++) {
inputMap = new HashMap<>();
inputMap.put("input_1", String.valueOf(i));
inputMap.put("input_2", String.valueOf(i));
ruleInputList.add(inputMap);
}
return ruleInputList;
}
private void validateTestOutput(ConcurrentHashMap<Map<String, String>, String> resultMap) {
assertEquals(resultMap.size(), 20000);
resultMap.entrySet().removeIf(entry -> entry.getValue().equals(""));
assertEquals(resultMap.size(), 7);
}
}
我还设置了VM参数-Djava.util.concurrent.ForkJoinPool.common.parallelism=4 这等于机器中可用处理器的数量。
尽管如此,我得到的结果是:
Benchmark Mode Cnt Score Error Units
ParallelEvaluateTest.evaluateParallelTest thrpt 0.001 ops/ms
ParallelEvaluateTest.evaluateSerialTest thrpt 0.001 ops/ms
ParallelEvaluateTest.evaluateParallelTest avgt 790.351 ms/op
ParallelEvaluateTest.evaluateSerialTest avgt 799.799 ms/op
因此,简而言之,与串行流相比,并行流并没有产生更好的性能。 并行流代码在这里创建:
ruleInputList.parallelStream().forEach(ruleInput -> {
resultMap.put(ruleInput, ruleSystem1.evaluate(ruleInput));
}
);
如果您想了解并行性如何改善运行时间,那么您应该只测量并行运行的部分,数据生成和验证应该分开(我在评论中提到过这一点 - 请参阅Amdahl's)。单独来看,20k 不足以看到任何显着差异。看看下面的代码:
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
public class Test {
public static void main(String [] args) throws Exception {
System.out.println("procssors: " + Runtime.getRuntime().availableProcessors());
List<Map<String, String>> data = createTestDataInput();
evaluate(data, true);
evaluate(data, false);
long start = System.currentTimeMillis();
evaluate(data, true);
long pend = System.currentTimeMillis();
evaluate(data, false);
long send = System.currentTimeMillis();
System.err.println("Parallel: " + (pend - start) + "ms");
System.err.println("Sequencial: " + (send - pend) + "ms");
}
public static void evaluate(List<Map<String, String>> ruleInputList, boolean p) {
final ConcurrentHashMap<Map<String, String>, String> resultMap = new ConcurrentHashMap<>();
if(p) {
ruleInputList.parallelStream().forEach(ruleInput -> {
resultMap.put(ruleInput, "a");
});
} else {
ruleInputList.stream().forEach(ruleInput -> {
resultMap.put(ruleInput, "b");
});
}
}
private static List<Map<String, String>> createTestDataInput() {
final List<Map<String, String>> ruleInputList = new ArrayList<>();
Map<String, String> inputMap = new HashMap<>();
for (int i =1; i <= 2000000; i++) {
inputMap = new HashMap<>();
inputMap.put("input_1", String.valueOf(i));
inputMap.put("input_2", String.valueOf(i));
ruleInputList.add(inputMap);
}
return ruleInputList;
}
}
procssors: 8
Parallel: 302ms
Sequencial: 545ms