为什么Reactor的Mono.fromCompletionStage比普通的CompletableFuture慢?

问题描述 投票:1回答:1

我有一段简单的代码,它在后台 "处理 "数据,并且在每一个 nth 项目,记录上一个项目花费的总时间。n 项目。

class BackgroundWorker implements AutoCloseable {
  private final ExecutorService thread = Executors.newSingleThreadExecutor();
  private final int reportEvery;
  private int processed;
  private LocalTime begin;

  BackgroundWorker(int reportEvery) {
    this.reportEvery = reportEvery;
  }

  CompletableFuture<Boolean> process(int item) {
    var future = new CompletableFuture<Boolean>();
    thread.submit(() ->  {
      try {
        if (processed == 0) {
          begin = LocalTime.now();
        }
        if (++processed == reportEvery) {
          System.out.format("Processed %d items in %dms%n",
              processed, ChronoUnit.MILLIS.between(begin, LocalTime.now()));
          processed = 0;
        }
        future.complete(true);
      } catch (Exception ex) {
        future.complete(false);
      }
    });
    return future;
  }

  @Override
  public void close() {
    thread.shutdownNow();
  }
}

那么我有一个 Flux 将数据输入到 BackgroundWorker,算上 CompletableFuture的成功完成。

Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100000).boxed());
try (var worker = new BackgroundWorker(10000)) {
  int successCount = numbers
      .map(worker::process)
      .map(future -> future.thenApply(success -> success ? 1 : 0))
      .reduce(
          CompletableFuture.completedFuture(0),
          (acc, curr) -> acc.thenCombine(curr, Integer::sum))
      .block()
      .join();

  System.out.println("Done; success: " + successCount);
}

同样的代码,但现在使用的是 Mono.fromCompletionStage 而不是。

int successCount = numbers
    .map(worker::process)
    .map(Mono::fromCompletionStage)
    .map(mono -> mono.map(success -> success ? 1 : 0))
    .reduce(
        Mono.just(0),
        (acc, curr) -> acc.zipWith(curr, Integer::sum))
    .block()
    .block();

第一个使用期货的版本会打印出这样的内容:

Processed 10000 items in 48ms
Processed 10000 items in 17ms
Processed 10000 items in 10ms
Processed 10000 items in 8ms
Processed 10000 items in 9ms
Processed 10000 items in 5ms
Processed 10000 items in 5ms
Processed 10000 items in 4ms
Processed 10000 items in 3ms
Processed 10000 items in 4ms
Done; success: 100000

但是使用 Mono.fromCompletionStage 打印。

Processed 10000 items in 138ms
Processed 10000 items in 253ms
Processed 10000 items in 327ms
Processed 10000 items in 477ms
Processed 10000 items in 315ms
Processed 10000 items in 379ms
Processed 10000 items in 448ms
Processed 10000 items in 509ms
Processed 10000 items in 595ms
Processed 10000 items in 668ms
Done; success: 100000

为什么要使用 Mono 而不是 CompletableFuture 导致性能下降这么多?

java project-reactor completable-future
1个回答
1
投票

看来压缩的 Monos是占用时间最多、影响执行的东西。可能是因为这样的压缩每次都会创建一个新的MonoZip实例。

但这时你不必使用缩减和压缩。比较习惯的做法是 flatMap 的单体,得到一个 Flux<Integer> 你将减少而不产生中间垃圾。

另外,由于期货基本上是在创建时就开始处理,你可以做一个更简单的 concatMap (更少的开销,而且必须等待每个单子的完成,在这一点上并不重要,因为反正所有的期货已经在后台运行了)。

Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100_000).boxed());
try (BackgroundWorker worker = new BackgroundWorker(10000)) {
    int successCount = numbers
            .map(worker::process)
            .concatMap(future -> Mono.fromCompletionStage(future))
            .map(success -> success ? 1 : 0)
            .reduce(0, Integer::sum)
            .block();

    System.out.println("Done; success: " + successCount);
}

你甚至可以通过避免从布尔值到int的映射来减少更多的开销,并在reduce中进行。

.reduce(0, (acc, bool) -> bool ? acc + 1 : acc)
© www.soinside.com 2019 - 2024. All rights reserved.