我有一个连续发射的 Observable[T] ,并且我想要一个 Observable[List[T]] ,它在指定持续时间内为源发射的每个元素发射最后一个元素。示例
Observable. range(0, 100)
.delayExecution(1.second)
// -- add here some operator combination which takes the parameter of 3 seconds
//should output:
// t0: List(0)
// t+1s: List(0,1)
// t+2s: List(0,1,2)
// t+3s: List(1,2,3)
// t+4s: List(2,3,4)
请注意,发出的 List 包含指定持续时间内最后累积的元素,并且在每个源项上发出。 bufferTimed 运算符不会在每个源项上发出。
我正在考虑实现一个类似于 monix.reactive.internal.operators.BufferTimedObservable 的运算符,但具有上述逻辑,但如果有更简单的方法,我不想付出这种努力
在评论中提出一些建议后,我想出了一些可以完成这项工作的东西,也许有人有更优雅的解决方案?
import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import java.time.ZonedDateTime
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
val events = Observable.range(0, 100).delayOnNext(1 second)
val timeWindow = 5 seconds
val timestamps = events.map(_ => ZonedDateTime.now())
events.zip(timestamps).scan(Seq[(Long, ZonedDateTime)]())((acc, event) => {
(acc :+ event).filter(_._2.isAfter(ZonedDateTime.now().minusSeconds(timeWindow.toSeconds)))
}).map(_.map(_._1)).foreach(println)