并行运行 Flux 操作

问题描述 投票:0回答:1

我有一个项目列表,我需要从不同来源检索数据、合并并返回这些项目。这些项目是完全独立的,获取数据将花费最多的时间,所以我希望它并行完成。我以为我可以使用 Flux.fromStream() 和 Mono.zipWith() 在 Reactor 中做到这一点,但它似乎在等待每个项目完成后再开始下一个项目。

这是我写的一个测试来说明这个问题。该代码制作了 10 个三明治,并模拟了获取花生酱和果冻的随机延迟:

class FluxOrderingTest {
  private final Random random = new Random();

  @Test
  void testAsyncSandwich() {
    Flux<String> sandwiches = Flux.fromStream(IntStream.rangeClosed(1, 10).boxed())
            .flatMap(number -> getJelly(number).zipWith(getPeanutButter(number)))
            .map(this::makeSandwich);

    StepVerifier.create(sandwiches)
            .expectNextCount(10)
            .verifyComplete();
  }

  private String makeSandwich(Tuple2<String, String> ingredients) {
    System.out.printf("Combined %s with %s%n", ingredients.getT1(), ingredients.getT2());
    return "Sandwich";
  }

  private Mono<String> getPeanutButter(Integer number) {
    return Mono.fromSupplier(() -> withDelay("the peanut butter for " + number));
  }

  private Mono<String> getJelly(Integer number) {
    return Mono.fromSupplier(() -> withDelay("the jelly for " + number));
  }

  private String withDelay(String s) {
    try {
      Thread.sleep(random.nextInt(1000));
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("Got " + s);
    return s;
  }
}

在输出中,我预计三明治会乱序完成,因为获取配料所需的时间是随机的。但是,Reactor 会按顺序完成每个三明治。

Got the jelly for 1
Got the peanut butter for 1
Combined the jelly for 1 with the peanut butter for 1
Got the jelly for 2
Got the peanut butter for 2
Combined the jelly for 2 with the peanut butter for 2
Got the jelly for 3
Got the peanut butter for 3
Combined the jelly for 3 with the peanut butter for 3
Got the jelly for 4
Got the peanut butter for 4
Combined the jelly for 4 with the peanut butter for 4
Got the jelly for 5
Got the peanut butter for 5
Combined the jelly for 5 with the peanut butter for 5
Got the jelly for 6
Got the peanut butter for 6
Combined the jelly for 6 with the peanut butter for 6
Got the jelly for 7
Got the peanut butter for 7
Combined the jelly for 7 with the peanut butter for 7
Got the jelly for 8
Got the peanut butter for 8
Combined the jelly for 8 with the peanut butter for 8
Got the jelly for 9
Got the peanut butter for 9
Combined the jelly for 9 with the peanut butter for 9
Got the jelly for 10
Got the peanut butter for 10
Combined the jelly for 10 with the peanut butter for 10

既然我不关心最后的订单,我该如何安排同时取走所有的原料?

java asynchronous spring-webflux project-reactor
1个回答
0
投票

您的代码是合成的,具有实际 IO 的真实代码的行为会有所不同,因为它会为 IO 使用不同的Schedulers。当然,这取决于您如何进行 IO。如果你使用反应式 IO,你应该没有任何问题。

在您的情况下,它只是一个调度程序,可以立即运行所有内容,即在一个线程中。

尝试订阅不同的调度程序:

.flatMap(number -> 
    getJelly(number)
        .subscribeOn(Schedulers.boundedElastic())
        .zipWith(
            getPeanutButter(number)
                .subscribeOn(Schedulers.boundedElastic())
        )
        
)
© www.soinside.com 2019 - 2024. All rights reserved.