并行执行缓冲通量上的映射

问题描述 投票:0回答:1

我有一种变化,所有的价值观同时出现。我想对这些值的块并行执行一项时间密集型任务。为什么这段代码在同一线程上执行每个映射而不是并行执行?

@Test
public void test() throws InterruptedException {
    Flux.just("a", "b", "c", "d", "e", "f", "g")
        .buffer(2)
        .publishOn(Schedulers.parallel())
        .map(this::doTimeintensiveStuff)
        .doOnNext(val -> {
            log.info("[" + Thread.currentThread().getName() + "]  " + val);
        })
        .blockLast();
}

private String doTimeintensiveStuff(List<String> input) {
    return String.join(", ", input); // this is just a place holder
}

我的日志输出如下所示

Aug. 07, 2023 4:44:50 PM de.test.ParallelTest lambda$1
INFORMATION: [parallel-1]  a, b
Aug. 07, 2023 4:44:50 PM de.test.ParallelTest lambda$1
INFORMATION: [parallel-1]  c, d
Aug. 07, 2023 4:44:50 PM de.test.ParallelTest lambda$1
INFORMATION: [parallel-1]  e, f
Aug. 07, 2023 4:44:50 PM de.stest.ParallelTest lambda$1
INFORMATION: [parallel-1]  g
java spring-webflux project-reactor
1个回答
0
投票

应该是:

public void test() throws InterruptedException {
    Flux.just("a", "b", "c", "d", "e", "f", "g")
        .buffer(2)
        .parallel()
        .runOn(Schedulers.parallel())
        .map(this::doTimeintensiveStuff)
        .sequential()
        .doOnNext(val -> {
            log.info("[" + Thread.currentThread().getName() + "]  " + val);
        })
        .blockLast();
}

因为

.publishOn(..)
不会派生新线程,它只是定义当前线程。但是
.parallel().runOn(..)
使用并行线程来执行。

© www.soinside.com 2019 - 2024. All rights reserved.