我正在尝试使用Apache Beam(通过Scio)对流源中最近三天的数据(处理时间)进行连续汇总,并从最早中输出结果,< [活动每5分钟显示一次窗口。 Earliest表示开始时间最早的窗口,active表示尚未结束窗口。本质上,我试图通过删除滑动窗口之间的不重叠时间来获得“滚动”聚合。
使用大小为3天,期限为1天的示例滑动窗口对我要完成的工作的可视化:early firing - ^ no firing - x
|
** stop firing from this window once time passes this point
^ ^ ^ ^ ^ ^ ^ ^
| | | | | | | | ** stop firing from this window once time passes this point
w1: +====================+^ ^ ^
x x x x x x x | | |
w2: +====================+^ ^ ^
x x x x x x x | | |
w3: +====================+
time: ----d1-----d2-----d3-----d4-----d5-----d6-----d7---->
我已经尝试过使用滑动窗口(大小= 3天,周期= 5分钟),但是将来它们每3天/ 5分钟组合就会产生一个新窗口,并且会为我最近的尝试,似乎有些骇人听闻,包括将窗口信息附加到DoFn中的每个键上,重新进入固定窗口,并尝试从附加数据中分组并缩小到最旧的窗口,但是final每个窗口]发出早期结果。 。我尝试使用
trigger = AfterWatermark.pastEndOfWindow()
,但在工作首次开始时需要早期结果。我尝试过比较窗口之间的pane
数据(isLast
,timestamp
等),但它们看起来相同。
reduceByKey
似乎什么都不输出。DoFn附加窗口信息
// ValueType is just a case class I'm using for objects
type DoFnT = DoFn[KV[String, ValueType], KV[String, (ValueType, Instant)]]
class Test extends DoFnT {
// Window.toString looks like the following:
// [2020-05-16T23:57:00.000Z..2020-05-17T00:02:00.000Z)
def parseWindow(window: String): Instant = {
Instant.parse(
window
.stripPrefix("[")
.stripSuffix(")")
.split("\\.\\.")(1))
}
@ProcessElement
def process(
context: DoFnT#ProcessContext,
window: BoundedWindow): Unit = {
context.output(
KV.of(
context.element().getKey,
(context.element().getValue, parseWindow(window.toString))
)
)
}
}
sc
.pubsubSubscription(...)
.keyBy(_.key)
.withSlidingWindows(
size = Duration.standardDays(3),
period = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
allowedLateness = Duration.ZERO,
trigger = Repeatedly.forever(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))))))
.reduceByKey(ValueType.combineFunction())
.applyPerKeyDoFn(new Test())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO))
.reduceByKey((x, y) => if (x._2.isBefore(y._2)) x else y)
.saveAsCustomOutput(
TextIO.write()...
)
有什么建议吗?
我正在尝试使用Apache Beam(通过Scio)对流源中的最近三天数据(处理时间)进行连续汇总,并从最早的活动窗口中输出结果,每隔...
现在您可以使用现成的滑动窗口以所需的方式对所需的聚合进行分组。