我需要严格地一一处理不同类型的事件,但是在后台线程中。
根据文档的下一个代码下一个代码Schedulers.from(executor, false, true);应该满足我的要求,但实际上并没有。
代码:
ExecutorService executor = Executors.newSingleThreadExecutor();
Scheduler scheduler = Schedulers.from(executor, false, true);
PublishSubject<String> subject1 = PublishSubject.create();
PublishSubject<String> subject2 = PublishSubject.create();
subject1.observeOn(scheduler).subscribe(log::info);
subject2.observeOn(scheduler).subscribe(log::info);
subject1.onNext("Hello11");
subject2.onNext("Hello21");
subject1.onNext("Hello12");
subject2.onNext("Hello22");
subject1.onNext("Hello13");
subject2.onNext("Hello23");
log.info("Test activity");
有下一个输出:
22:19:05.313 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
22:19:05.313 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:19:05.316 [pool-1-thread-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
这表明,当每个观察者在释放
scheduler
之前给出所有事件时,事件处理是以贪婪的方式执行的。这与与文档相矛盾
.
如果将
.observeOn(scheduler)
替换为 .subscribeOn(scheduler)
,则输出为下一个:
22:23:56.162 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello11
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello21
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello12
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello22
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello13
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Hello23
22:23:56.164 [ForkJoinPool-1-worker-1] INFO c.c.t.junit5example.RxJavaTest -- Test activity
它在同一个线程中执行所有事件,这与
.subscribeOn
的整个想法相矛盾。
这是一个错误还是有办法让它按照文档中的预期工作? 版本是
io.reactivex.rxjava3:rxjava:3.1.9
这不是
Scheduler
的错误,而是 observeOn
总是以贪婪方式运行的结果。在第一种情况下,因为第一个序列的所有项目几乎立即可供observeOn使用,所以它在一次执行程序运行中在同一线程上发出这些项目。
您可以使用另一个运算符为每个项目创建一个任务,例如零延迟的
delay
,以获得更好的交错:
subject1.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
subject2.delay(0, TimeUnit.MILLISECONDS, scheduler).subscribe(log::info);
第二种情况按预期工作,因为在
subscribeOn
上使用 Subject
对其交付的物品没有影响。在您的情况下,这些项目被发出,因此在同一线程上进行处理,就像没有 subscribeOn
时一样。