Scala Fs2:无限流上的聚合计算

问题描述 投票:0回答:1

我似乎无法理解如何在无限流上执行聚合计算。获取无限的元素流并对每个元素单独执行计算很容易,但收集多个元素并执行聚合计算似乎是不可能的。

我使用的是 scala 和 Fs2,但这可能适用于任何无限流媒体库。

假设您有无限的事件流,并附加了一些时间戳

Stream[F, Event]
事件在哪里
case class Event(time: Instant, data: Json)

我们想要对每分钟的事件进行一些简单的分析 - 也许计算数量,但计算本身是微不足道的。假设我们有一个现有的函数

def analyse: List[Event] => Result
其中 Result 只是一个字符串包装器,存储一些未知分析的输出
case class Result(v: String)

如果我的理解是正确的,我们需要定义一个 Pipe

Pipe[F, Event, Result]

我无法理解它如何与无限流一起工作,也不知道如何实现它。

从逻辑上讲,我的想法建议将流分成每小时有限的事件流,将其转换为列表,然后将其传递给

analyse
函数,但无限流的分裂感觉不合逻辑,就好像你要保留分割一个永远不会结束的无限流。

scala scala-cats fs2
1个回答
0
投票

由于您的目标是对每分钟发生的事件进行分析,因此您可能应该首先建立某种边界事件来表示每个缓冲期的开始/结束。您可以使用

Stream.awakeEvery(finiteDuration)
创建一个定期发出事件的流。然后,您可以使用
Stream#either
将其与事件流合并,以获得
Stream[F, Either[Event, FiniteDuration]]

从那里开始,最简单的方法可能是使用

Stream#scan
Stream#evalScan
在某种状态下折叠,例如

val events: Stream[F, Event] = ???
def analyze(buffer: List[Event]): Result = ???

// an event buffer, and an optional value to emit
type ScanState = (List[Event], Option[Result])

val analyzedEvents: Stream[F, Result] = events
  .either(Stream.awakeEvery[F](1.minute))
  .scan[ScanState](Nil -> None) { 
    case ((buffer, _), Left(event)) =>
      // add the event to the buffer (in reverse order for O(1) prepend)
      (event :: buffer, None)
    case ((buffer, _), Right(timerTick)) =>
      // perform analysis, clear buffer, emit result
      val result = analyze(buffer.reverse)
      (Nil, Some(result))
  }
  .unNone // omit `None`s to just emit plain `Result`

还有流的

Pull
接口,您可以使用它代替
scan
来实现对如何使用输入和发出输出的更细粒度的控制。请参阅 https://fs2.io/#/guide?id=transforming-streams-using-pulls 了解相关介绍。

请注意,如果您的

awakeEvery
持续时间增加,事件缓冲区的潜在大小也会增加,因此您需要在决定如何构建和清除缓冲区时考虑到这一点。

© www.soinside.com 2019 - 2024. All rights reserved.