具有以下窗口功能,
Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardDays(1)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(30)))
)
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(20), Window.ClosingBehavior.FIRE_IF_NON_EMPTY));
我们在Beam 2.20.0
下遇到以下错误
java.lang.IllegalArgumentException: Unsafe trigger may lose data, see https://s.apache.org/finishing-triggers-drop-data: AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 hour))
at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:171)
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:226)
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:110)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:355)
at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1596)
at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1485)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
at com.beam.test.monitorAdsUnit$CaculateUnitAbnormalECPM.expand(monitorAdsUnit.java:153)
at com.beam.test.monitorAdsUnit$CaculateUnitAbnormalECPM.expand(monitorAdsUnit.java:149)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
at com.beam.test.monitorAdsUnit.main(monitorAdsUnit.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
并且我们从上面的URL链接中找到了解决方案
解决方法是,对于顶级
GroupByKey
转换,不允许完成触发。
有人可以给我们更清晰的解释吗?
当前,我们的解决方法是将Repeatedly.forever(...)
隐式添加到顶级触发器。
您的触发器AfterWatermark.withEarlyFirings(...)
将在水印到达窗口结尾时最后一次触发,然后丢弃所有后续数据。由于几乎确定这是数据丢失,因此禁用了这些触发器。
根据您的情况,您可以通过.withAllowedLateness(<20 minutes>)
设置允许的延迟时间。我假设您确实想要一些包含那20分钟数据的输出。但是它总是会被丢弃。由于触发器将丢弃数据,因此允许的延迟无效。
您的解决方法是正确的更改。这将导致较晚的数据被立即发出。为了获得最佳可读性,我建议几乎总是使用触发器:
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(...)
.withLateFirings(...)
这清楚地概述了聚合的“生命周期”:对于早期/推测结果,您具有一种行为,对于“早期/推测”结果具有一种行为,对于后期/修订结果,则具有一种行为。
在下游,您可以观察PaneInfo
来针对上述三种情况调整处理。