如果只有另一个流已经发出了项目并且在Rxjava中没有使用它,我该如何发出它

问题描述 投票:1回答:2

我的问题看起来很简单,但请参阅下文。

stream alphabet : ----------------------(A)----------------------------(B)-----
stream number   : ---(1)-----(2)------------------(3)----(4)----(5)----------(6)----
emission        :    true    true                false   true   true         false    
  • 如果没有字母项目,则为真,则为真
  • 发出错误,表示发出了字母项目,但尚未消耗
  • 如果消耗了最后一个发射的字母,则为真

有什么好的运算符吗??

android rx-java rx-android
2个回答
0
投票

如果Alphabet(steamA)是可观察到的热门话题,我认为此骇客的基于Take / Skip的代码可以回答您的问题:

Observable.merge(streamA, Observable.just("init"))
  .switchMap(a -> {
    if ("init".equals(a)) return streamN.map(n -> true);
    return Observable.merge(
     streamN.take(1).map(n -> false),
     streamN.skip(1).map(n -> true)
   )
}

0
投票

对我来说,尚不清楚“尚未消耗字母表项排放”对您的意义。为此,需要知道或控制字母的接收者。另外,布尔输出还有第二个使用者,因此两个流之间存在相关性。

Observable<String> alphabetFlow = ...

Observable<Integer> numberFlow = ...

AtomicBoolean consumed = new AtomicBoolean(true);

alphabetFlow
    .observeOn(Schedulers.io())
    .doOnNext(letter -> consumed.set(false))
    .doAfterNext(letter -> consumed.set(true))
    .subscribe(letter -> {
         System.out.println("I'm consuming" + letter));
         Thread.sleep(2000); // simulate processing
         System.out.println("I've consumed" + letter));
    });

numberFlow
    .map(number -> consumed.get())
    .subscribe(status -> System.out.println("Status: " + status));

Thread.sleep(100000); // in case you have this in main(String[] args)
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.