反应流(monix)运算符组合,用于缓冲具有重叠元素的时间跨度

问题描述 投票:0回答:1

我有一个连续发射的 Observable[T] ,并且我想要一个 Observable[List[T]] ,它在指定持续时间内为源发射的每个元素发射最后一个元素。示例

Observable. range(0, 100)   
    .delayExecution(1.second)
    // -- add here some operator combination which takes the parameter of 3 seconds

//should output:
// t0: List(0)
// t+1s: List(0,1)
// t+2s: List(0,1,2)
// t+3s: List(1,2,3)
// t+4s: List(2,3,4)

请注意,发出的 List 包含指定持续时间内最后累积的元素,并且在每个源项上发出。 bufferTimed 运算符不会在每个源项上发出。

我正在考虑实现一个类似于 monix.reactive.internal.operators.BufferTimedObservable 的运算符,但具有上述逻辑,但如果有更简单的方法,我不想付出这种努力

scala observable rx-java reactive-programming monix
1个回答
0
投票

在评论中提出一些建议后,我想出了一些可以完成这项工作的东西,也许有人有更优雅的解决方案?

import monix.reactive.Observable
import monix.execution.Scheduler.Implicits.global
import java.time.ZonedDateTime
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

val events = Observable.range(0, 100).delayOnNext(1 second)

val timeWindow = 5 seconds

val timestamps = events.map(_ => ZonedDateTime.now())

events.zip(timestamps).scan(Seq[(Long, ZonedDateTime)]())((acc, event) => {
  (acc :+ event).filter(_._2.isAfter(ZonedDateTime.now().minusSeconds(timeWindow.toSeconds)))
}).map(_.map(_._1)).foreach(println)
© www.soinside.com 2019 - 2024. All rights reserved.