使用 Apache Beam 进行窗口化 - 修复了窗口似乎没有关闭的问题?

问题描述 投票:0回答:2

我们正在尝试在 Apache Beam 管道上使用固定窗口(使用

DirectRunner
)。我们的流程如下:

  1. 从发布/订阅中提取数据
  2. 将 JSON 反序列化为 Java 对象
  3. 带有 5 秒固定窗口的窗口事件
  4. 使用自定义
    CombineFn
    ,将
    Event
    的每个窗口组合成
    List<Event>
  5. 为了测试,只需输出结果即可
    List<Event>

管道代号:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn())
                );

我们看到的结果是最后一步永远不会运行,因为我们没有得到任何日志记录。

此外,我们在自定义

System.out.println("***")
类的每个方法中添加了
CombineFn
,以便跟踪这些方法何时运行,但似乎它们也没有运行。

这里的窗口设置不正确吗?我们遵循了在 https://beam.apache.org/documentation/programming-guide/#windowing 找到的示例,它看起来相当简单,但显然缺少一些基本的东西。

任何见解表示赞赏 - 提前致谢!

java google-cloud-dataflow apache-beam
2个回答
13
投票

看起来主要问题确实是缺少触发器 - 窗口正在打开,但没有任何东西告诉它何时发出结果。我们想简单地根据处理时间(而不是事件时间)来设置窗口,因此执行了以下操作:

.apply("Window", Window
    .<Event>into(new GlobalWindows())
    .triggering(Repeatedly
        .forever(AfterProcessingTime
            .pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(5))
        )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)

本质上,这会创建一个全局窗口,在处理第一个元素后 5 秒触发该窗口发出事件。每次关闭窗口时,一旦收到元素,就会打开另一个窗口。当我们没有

withAllowedLateness
片时,Beam 抱怨了 - 据我所知,这只是告诉它忽略任何迟到的数据。

我的理解可能有点离题,但是上面的代码片段已经解决了我们的问题!


0
投票

根本问题可能并不是真正缺少全局触发因素。虽然这确实会强制触发并且似乎可以解决问题,但更可能是 PubSubIO 未正确设置水印的结果。正如本期https://github.com/apache/beam/issues/19518中所记录的,使用本地运行器时,水印通常根本不会继续。因此,您原来的 5 秒窗口的默认触发器永远不会交叉/触发,因为 Beam 会等待水印的进展,而这不会发生。

© www.soinside.com 2019 - 2024. All rights reserved.