RxJava PublishSubject 缓冲区元素超时

问题描述 投票:0回答:1

我想用 RxJava 实现以下目标:

  1. 缓冲元素并在最后一个元素过去 5 秒后发布它们
  2. 在第一个元素之后的 20 秒内发布缓冲元素

元素示例:

[A -> 2 秒 -> B -> 3 秒 -> C -> 6 秒 -> D -> 4 秒 -> E -> 9 秒 -> F -> 1 秒 -> G -> 15 秒 - > H]

结果应该是:

[A、B、C]

[D、E、F]

[G,H]

目前,我可以在第一个元素生成后延迟 20 秒后发布元素,我应该更新什么来实现第一部分?

fun <T> Observable<T>.buffered(): Observable<List<T>> = publish { shared ->

    val startEvent = shared.throttleFirst(20, TimeUnit.SECONDS, scheduler)

    shared.buffer(startEvent.mergeWith(startEvent.delay(20, TimeUnit.SECONDS, scheduler, false)))

}
java android kotlin rx-java rx-java2
1个回答
0
投票

对于规则 1,您需要

debounce
。对于规则 2,它变得复杂。

在序列中,当前的第一个事件应触发 20 秒的计时器,之后发出信号以启动新的缓冲区。然而,如果与规则 1 有 5 秒的差距,则必须取消此 20 秒计时器。

这是一个打印时间和缓冲区的示例来说明解决方案:

import java.util.List;
import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;

void main(String[] args) {
    
    // The signal pattern
    var source = Observable.fromArray(1, 2, 3)
    .concatWith(Observable.range(10, 25))
    .concatWith(Observable.range(45, 4))
    .flatMap(v -> Observable.just(v).delay(v, TimeUnit.SECONDS))
    .doOnNext(v -> System.out.println("Tick - " + v));
    
    // buffering action
    source.publish(shared -> {
        var db = shared.debounce(5, TimeUnit.SECONDS)
                .doOnNext(v -> System.out.println("Debounce 5 seconds - " + v))
                .publish()
                .autoConnect();
        
        var wnd = shared
                .take(1)
                .delay(20, TimeUnit.SECONDS)
                .takeUntil(db)
                .repeat()
                .doOnNext(v -> System.out.println("Window 20 seconds - " + v));
        
        return shared.buffer(db.mergeWith(wnd));
    })
    .blockingSubscribe(System.out::println);
}

我们有两个信号,

db
用于 5 秒去抖,
wnd
用于 20 秒窗口。

在去抖部分,我们去抖5秒。因为我们需要窗口部分的去抖信号本身,所以我们必须发布+自动连接该部分以避免来自两个订阅的双重去抖信号。

在窗口部分,我们从共享源中获取一项,并将其延迟 20 秒。然而,如果去抖部分首先发出信号,我们必须取消该序列,这样它就不会创建不需要的窗口。接下来,重复将确保如果 20 秒过去或延迟被取消,我们将开始备份并等待下一个源项目重复该过程。

© www.soinside.com 2019 - 2024. All rights reserved.