RxJava 3:如何公平地处理多个订阅?

问题描述 投票:0回答:1

我需要严格地一一处理不同类型的事件,但是在后台线程中。

根据文档的下一个代码下一个代码Schedulers.from(executor, false, true);应该满足我的要求,但实际上并没有。

代码:

ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();

subject1.observeOn(scheduler).subscribe(log::info);
subject2.observeOn(scheduler).subscribe(log::info);

subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");

log.info("Test activity");

有下一个输出:

22:19:05.313 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Test activity
22:19:05.313 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello11
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello12
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello13
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello21
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello22
22:19:05.316 [pool-1-thread-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello23

这表明,当每个观察者在释放

scheduler
之前给出所有事件时,事件处理是以贪婪的方式执行的。这与与文档相矛盾 .

如果将

.observeOn(scheduler)
替换为
.subscribeOn(scheduler)
,则输出为下一个:

22:23:56.162 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello11
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello21
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello12
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello22
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello13
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Hello23
22:23:56.164 [ForkJoinPool-1-worker-1] INFO  c.c.t.junit5example.RxJavaTest -- Test activity

它在同一个线程中执行所有事件,这与

.subscribeOn
的整个想法相矛盾。

这是一个错误还是有办法让它按照文档中的预期工作? 版本是

io.reactivex.rxjava3:rxjava:3.1.9

java rx-java rx-java3
1个回答
0
投票

这不是

Scheduler
的错误,而是
observeOn
总是以贪婪方式运行的结果。在第一种情况下,因为第一个序列的所有项目几乎立即可供observeOn使用,所以它在一次执行程序运行中在同一线程上发出这些项目。

您可以使用另一个运算符为每个项目创建一个任务,例如零延迟的

delay
,以获得更好的交错:

subject1.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
subject2.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);

第二种情况按预期工作,因为在

subscribeOn
上使用
Subject
对其交付的物品没有影响。在您的情况下,这些项目被发出,因此在同一线程上进行处理,就像没有
subscribeOn
时一样。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.