Apache Beam-仅滑动窗口发射最早的活动窗口

问题描述 投票:0回答:1

我正在尝试使用Apache Beam(通过Scio)对流源中最近三天的数据(处理时间)进行连续汇总,并从最早中输出结果,< [活动每5分钟显示一次窗口。 Earliest表示开始时间最早的窗口,active表示尚未结束窗口。本质上,我试图通过删除滑动窗口之间的不重叠时间来获得“滚动”聚合。

使用大小为3天,期限为1天的示例滑动窗口对我要完成的工作的可视化:

early firing - ^ no firing - x | ** stop firing from this window once time passes this point ^ ^ ^ ^ ^ ^ ^ ^ | | | | | | | | ** stop firing from this window once time passes this point w1: +====================+^ ^ ^ x x x x x x x | | | w2: +====================+^ ^ ^ x x x x x x x | | | w3: +====================+ time: ----d1-----d2-----d3-----d4-----d5-----d6-----d7---->

我已经尝试过使用滑动窗口(大小= 3天,周期= 5分钟),但是将来它们每3天/ 5分钟组合就会产生一个新窗口,并且会为

每个窗口]发出早期结果。 。我尝试使用trigger = AfterWatermark.pastEndOfWindow(),但在工作首次开始时需要早期结果。我尝试过比较窗口之间的pane数据(isLasttimestamp等),但它们看起来相同。

我最近的尝试,似乎有些骇人听闻,包括将窗口信息附加到DoFn中的每个键上,重新进入固定窗口,并尝试从附加数据中分组并缩小到最旧的窗口,但是final reduceByKey似乎什么都不输出。

DoFn附加窗口信息

// ValueType is just a case class I'm using for objects type DoFnT = DoFn[KV[String, ValueType], KV[String, (ValueType, Instant)]] class Test extends DoFnT { // Window.toString looks like the following: // [2020-05-16T23:57:00.000Z..2020-05-17T00:02:00.000Z) def parseWindow(window: String): Instant = { Instant.parse( window .stripPrefix("[") .stripSuffix(")") .split("\\.\\.")(1)) } @ProcessElement def process( context: DoFnT#ProcessContext, window: BoundedWindow): Unit = { context.output( KV.of( context.element().getKey, (context.element().getValue, parseWindow(window.toString)) ) ) } }

sc
  .pubsubSubscription(...)
  .keyBy(_.key)
  .withSlidingWindows(
    size = Duration.standardDays(3),
    period = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      allowedLateness = Duration.ZERO,
      trigger = Repeatedly.forever(
        AfterWatermark.pastEndOfWindow()
          .withEarlyFirings(
            AfterProcessingTime
              .pastFirstElementInPane()
              .plusDelayOf(Duration.standardMinutes(1)))))))
  .reduceByKey(ValueType.combineFunction())
  .applyPerKeyDoFn(new Test())
  .withFixedWindows(
    duration = Duration.standardMinutes(5),
    options = WindowOptions(
      accumulationMode = DISCARDING_FIRED_PANES,
      trigger = AfterWatermark.pastEndOfWindow(),
      allowedLateness = Duration.ZERO))
  .reduceByKey((x, y) => if (x._2.isBefore(y._2)) x else y)
  .saveAsCustomOutput(
    TextIO.write()...
  )
有什么建议吗?

我正在尝试使用Apache Beam(通过Scio)对流源中的最近三天数据(处理时间)进行连续汇总,并从最早的活动窗口中输出结果,每隔...

google-cloud-dataflow apache-beam spotify-scio
1个回答
0
投票
首先,关于处理时间:如果要根据处理时间进行窗口处理,则应将事件时间设置为处理时间。很好-这表示您正在处理的事件是摄取记录的事件,而不是记录所代表的事件。

现在您可以使用现成的滑动窗口以所需的方式对所需的聚合进行分组。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.