Rxjava2,Java 8 Streams,Plain Old Iteration之间的性能比较

问题描述 投票:5回答:1

我已经成为Java 8中Java函数编程以及Rx java的忠实粉丝。但是一位同事最近指出,使用这些产品会影响性能。所以决定运行JMH Bench标记,但看起来他是对的。无论我做什么,我都无法获得流版本以提供更好的性能。以下是我的代码

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream().parallel()
                .mapToInt(Integer::intValue)
                .filter(i -> i % 2 == 0)
                .mapToDouble(i->(double)i)
                .map(Math::sqrt)
                .boxed()
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Flowable.fromIterable(sourceList)
                       .parallel()
                       .runOn(Schedulers.computation())
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .sequential()
                       .blockingFirst();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

以上代码的结果:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt     Score     Error  Units
StreamVsVanilla.rxjava2  avgt   20  1179.733 ± 322.421  ns/op
StreamVsVanilla.stream   avgt   20    10.556 ±   1.195  ns/op
StreamVsVanilla.vanilla  avgt   20     8.220 ±   0.705  ns/op

即使我删除了parellal运算符并使用如下的顺序版本:

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream()
                .mapToInt(Integer::intValue)
                .filter(i -> i % 2 == 0)
                .mapToDouble(i->(double)i)
                .map(Math::sqrt)
                .boxed()
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Observable.fromIterable(sourceList)
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .blockingGet();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

结果不是很有利:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt   Score   Error  Units
StreamVsVanilla.rxjava2  avgt   20  12.226 ± 0.603  ns/op
StreamVsVanilla.stream   avgt   20  13.432 ± 0.858  ns/op
StreamVsVanilla.vanilla  avgt   20   7.678 ± 0.350  ns/op

有人可以帮我弄清楚我做错了什么吗?

编辑:

akarnokd指出我在顺序版本中使用额外的阶段到我的流版本中的unbox和box(我添加它以避免在过滤器和地图方法中隐式装箱拆箱)但是它变慢了所以我尝试没有那些代码如下

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Mode.AverageTime)
@OperationsPerInvocation(StreamVsVanilla.N)
public class StreamVsVanilla {
    public static final int N = 10000;

    static List<Integer> sourceList = new ArrayList<>(N);
    static {
        for (int i = 0; i < N; i++) {
            sourceList.add(i);
        }
    }

    @Benchmark
    public List<Double> vanilla() {
        List<Double> result = new ArrayList<Double>(sourceList.size() / 2 + 1);
        for (Integer i : sourceList) {
            if (i % 2 == 0){
                result.add(Math.sqrt(i));
            }
        }
        return result;
    }

    @Benchmark
    public List<Double> stream() {
        return  sourceList.stream()
                .filter(i -> i % 2 == 0)
                .map(Math::sqrt)
                .collect(Collectors.toList());
    }

    @Benchmark
    public List<Double> rxjava2(){
        return Observable.fromIterable(sourceList)
                       .filter(i->i%2==0)
                       .map(Math::sqrt)
                       .collect(()->new ArrayList<Double>(sourceList.size()/2+1),ArrayList::add)
                       .blockingGet();

    }

    public static void main(String[] args) throws RunnerException {

        Options options = new OptionsBuilder()
                .include(StreamVsVanilla.class.getSimpleName()).threads(1)
                .forks(1).shouldFailOnError(true).shouldDoGC(true)
                .jvmArgs("-server").build();
        new Runner(options).run();

    }
}

结果仍然或多或少相同:

# Run complete. Total time: 00:03:16

Benchmark                Mode  Cnt   Score   Error  Units
StreamVsVanilla.rxjava2  avgt   20  10.864 ± 0.555  ns/op
StreamVsVanilla.stream   avgt   20  10.466 ± 0.050  ns/op
StreamVsVanilla.vanilla  avgt   20   7.513 ± 0.136  ns/op
performance java-8 java-stream rx-java2
1个回答
4
投票

For the parallel version

启动并将值分派给多个线程相对昂贵。为了抵消这一点,并行计算通常比基础设施开销高几倍。但是,对于RxJava中的情况,Math :: sqrt是如此微不足道,并行开销在性能上占主导地位。

那么为什么Stream要快两个数量级呢?我只能假设线程窃取进入基准线程完成大部分实际工作的地方,也许一个后台线程会做一些其他的少量,因为在后台线程旋转时,主线程已经窃取了大部分任务背部。因此,您没有严格的并行执行,例如RxJava的并行执行,其中运算符以循环方式调度,以便所有并行轨道可以大致相等地忙碌。

For the sequential version

我认为你的Stream版本中有额外的拆箱和装箱阶段会增加一些开销。试试没有它:

   return  sourceList.stream()
            .filter(i -> i % 2 == 0)
            .map(Math::sqrt)
            .collect(Collectors.toList());
© www.soinside.com 2019 - 2024. All rights reserved.