我的问题看起来很简单,但请参阅下文。
stream alphabet : ----------------------(A)----------------------------(B)-----
stream number : ---(1)-----(2)------------------(3)----(4)----(5)----------(6)----
emission : true true false true true false
有什么好的运算符吗??
如果Alphabet(steamA)是可观察到的热门话题,我认为此骇客的基于Take / Skip的代码可以回答您的问题:
Observable.merge(streamA, Observable.just("init"))
.switchMap(a -> {
if ("init".equals(a)) return streamN.map(n -> true);
return Observable.merge(
streamN.take(1).map(n -> false),
streamN.skip(1).map(n -> true)
)
}
对我来说,尚不清楚“尚未消耗字母表项排放”对您的意义。为此,需要知道或控制字母的接收者。另外,布尔输出还有第二个使用者,因此两个流之间存在相关性。
Observable<String> alphabetFlow = ...
Observable<Integer> numberFlow = ...
AtomicBoolean consumed = new AtomicBoolean(true);
alphabetFlow
.observeOn(Schedulers.io())
.doOnNext(letter -> consumed.set(false))
.doAfterNext(letter -> consumed.set(true))
.subscribe(letter -> {
System.out.println("I'm consuming" + letter));
Thread.sleep(2000); // simulate processing
System.out.println("I've consumed" + letter));
});
numberFlow
.map(number -> consumed.get())
.subscribe(status -> System.out.println("Status: " + status));
Thread.sleep(100000); // in case you have this in main(String[] args)