Project Reactor 的 subscribeOn() 不会影响 onSubscribe() 在指定 Scheduler 中调用,如 JavaDoc 中所述

问题描述 投票:0回答:1

Reactor 项目的 Javadoc 指出 关于方法:

public final Flux<T> subscribeOn(Scheduler scheduler)

那个

Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain ...

我有下一个单元测试

    @Test
    void testSubscribeOn() {
        // when
        var flux = Flux.just("alex", "adam", "andrew")
                .subscribeOn(Schedulers.parallel())
                .log();

        // then
        StepVerifier.create(flux)
                .expectNextCount(3)
                .verifyComplete();
    }

我希望记录的消息将显示 onSubscribe、request、onNext、onComplete 将在并行调度程序的线程中运行。 但我的测试表明 onSubscribe 和 request 是从调用线程调用的:

16:09:21.286 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
16:09:21.290 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- request(unbounded)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(alex)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(adam)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(andrew)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onComplete()
java project-reactor
1个回答
0
投票

subscribeOn 将影响upstream流的订阅。因此,步骤验证者onSubscriberequest不会受到影响,因为它们发生在下游。只有对通量的内部/隐藏订阅才绑定到并行调度程序。由于此订阅是在指定的调度程序上完成的,因此其元素(onNext 信号)在同一调度程序上发出。

因此,如果您更改

log()
语句的位置,如下所示:

var flux = Flux.just("alex", "adam", "andrew")
               .log()
               .subscribeOn(Schedulers.parallel())

然后你将获得预期的行为:

[ INFO] (parallel-1) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (parallel-1) | request(unbounded)
[ INFO] (parallel-1) | onNext(alex)
[ INFO] (parallel-1) | onNext(adam)
[ INFO] (parallel-1) | onNext(andrew)
[ INFO] (parallel-1) | onComplete()
© www.soinside.com 2019 - 2024. All rights reserved.