我想用 RxJava 实现以下目标:
元素示例:
[A -> 2 秒 -> B -> 3 秒 -> C -> 6 秒 -> D -> 4 秒 -> E -> 9 秒 -> F -> 1 秒 -> G -> 15 秒 - > H]
结果应该是:
[A、B、C]
[D、E、F]
[G,H]
目前,我可以在第一个元素生成后延迟 20 秒后发布元素,我应该更新什么来实现第一部分?
fun <T> Observable<T>.buffered(): Observable<List<T>> = publish { shared ->
val startEvent = shared.throttleFirst(20, TimeUnit.SECONDS, scheduler)
shared.buffer(startEvent.mergeWith(startEvent.delay(20, TimeUnit.SECONDS, scheduler, false)))
}
对于规则 1,您需要
debounce
。对于规则 2,它变得复杂。
在序列中,当前的第一个事件应触发 20 秒的计时器,之后发出信号以启动新的缓冲区。然而,如果与规则 1 有 5 秒的差距,则必须取消此 20 秒计时器。
这是一个打印时间和缓冲区的示例来说明解决方案:
import java.util.List;
import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;
void main(String[] args) {
// The signal pattern
var source = Observable.fromArray(1, 2, 3)
.concatWith(Observable.range(10, 25))
.concatWith(Observable.range(45, 4))
.flatMap(v -> Observable.just(v).delay(v, TimeUnit.SECONDS))
.doOnNext(v -> System.out.println("Tick - " + v));
// buffering action
source.publish(shared -> {
var db = shared.debounce(5, TimeUnit.SECONDS)
.doOnNext(v -> System.out.println("Debounce 5 seconds - " + v))
.publish()
.autoConnect();
var wnd = shared
.take(1)
.delay(20, TimeUnit.SECONDS)
.takeUntil(db)
.repeat()
.doOnNext(v -> System.out.println("Window 20 seconds - " + v));
return shared.buffer(db.mergeWith(wnd));
})
.blockingSubscribe(System.out::println);
}
我们有两个信号,
db
用于 5 秒去抖,wnd
用于 20 秒窗口。
在去抖部分,我们去抖5秒。因为我们需要窗口部分的去抖信号本身,所以我们必须发布+自动连接该部分以避免来自两个订阅的双重去抖信号。
在窗口部分,我们从共享源中获取一项,并将其延迟 20 秒。然而,如果去抖部分首先发出信号,我们必须取消该序列,这样它就不会创建不需要的窗口。接下来,重复将确保如果 20 秒过去或延迟被取消,我们将开始备份并等待下一个源项目重复该过程。