Reactor 项目的 Javadoc 指出 关于方法:
public final Flux<T> subscribeOn(Scheduler scheduler)
那个
Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain ...
我有下一个单元测试
@Test
void testSubscribeOn() {
// when
var flux = Flux.just("alex", "adam", "andrew")
.subscribeOn(Schedulers.parallel())
.log();
// then
StepVerifier.create(flux)
.expectNextCount(3)
.verifyComplete();
}
我希望记录的消息将显示 onSubscribe、request、onNext、onComplete 将在并行调度程序的线程中运行。 但我的测试表明 onSubscribe 和 request 是从调用线程调用的:
16:09:21.286 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
16:09:21.290 [Test worker] INFO reactor.Flux.SubscribeOn.1 -- request(unbounded)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(alex)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(adam)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onNext(andrew)
16:09:21.293 [parallel-1] INFO reactor.Flux.SubscribeOn.1 -- onComplete()
subscribeOn 将影响upstream流的订阅。因此,步骤验证者的onSubscribe和request不会受到影响,因为它们发生在下游。只有对通量的内部/隐藏订阅才绑定到并行调度程序。由于此订阅是在指定的调度程序上完成的,因此其元素(onNext 信号)在同一调度程序上发出。
因此,如果您更改
log()
语句的位置,如下所示:
var flux = Flux.just("alex", "adam", "andrew")
.log()
.subscribeOn(Schedulers.parallel())
然后你将获得预期的行为:
[ INFO] (parallel-1) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (parallel-1) | request(unbounded)
[ INFO] (parallel-1) | onNext(alex)
[ INFO] (parallel-1) | onNext(adam)
[ INFO] (parallel-1) | onNext(andrew)
[ INFO] (parallel-1) | onComplete()