Apache beam IllegalArgumentException:不安全的触发器可能会丢失数据

问题描述 投票:0回答:1

具有以下窗口功能,

Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardDays(1)))
        .triggering(
                AfterWatermark.pastEndOfWindow()
                    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(30)))
        )
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardMinutes(20), Window.ClosingBehavior.FIRE_IF_NON_EMPTY));

我们在Beam 2.20.0下遇到以下错误

java.lang.IllegalArgumentException: Unsafe trigger may lose data, see https://s.apache.org/finishing-triggers-drop-data: AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 hour))
    at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:171)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:226)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:110)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:476)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:355)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1596)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1485)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
    at com.beam.test.monitorAdsUnit$CaculateUnitAbnormalECPM.expand(monitorAdsUnit.java:153)
    at com.beam.test.monitorAdsUnit$CaculateUnitAbnormalECPM.expand(monitorAdsUnit.java:149)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:542)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:493)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:368)
    at com.beam.test.monitorAdsUnit.main(monitorAdsUnit.java:119)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

并且我们从上面的URL链接中找到了解决方案

解决方法是,对于顶级GroupByKey转换,不允许完成触发。

有人可以给我们更清晰的解释吗?

当前,我们的解决方法是将Repeatedly.forever(...)隐式添加到顶级触发器。

google-cloud-dataflow apache-beam
1个回答
0
投票

您的触发器AfterWatermark.withEarlyFirings(...)将在水印到达窗口结尾时最后一次触发,然后丢弃所有后续数据。由于几乎确定这是数据丢失,因此禁用了这些触发器。

根据您的情况,您可以通过.withAllowedLateness(<20 minutes>)设置允许的延迟时间。我假设您确实想要一些包含那20分钟数据的输出。但是它总是会被丢弃。由于触发器将丢弃数据,因此允许的延迟无效。

您的解决方法是正确的更改。这将导致较晚的数据被立即发出。为了获得最佳可读性,我建议几乎总是使用触发器:

AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(...)
    .withLateFirings(...)

这清楚地概述了聚合的“生命周期”:对于早期/推测结果,您具有一种行为,对于“早期/推测”结果具有一种行为,对于后期/修订结果,则具有一种行为。

在下游,您可以观察PaneInfo来针对上述三种情况调整处理。

© www.soinside.com 2019 - 2024. All rights reserved.