GroupBy 窗口聚合:处理空窗口

问题描述 投票:0回答:1

从流媒体源消费时,

  1. 我们使用 Groupby 窗口聚合:
    .group_by(col('name'))
    )(参见此处
  2. 并有一个包含所有可能的
    name
    值的表格(在下面的示例中
    ['Alice', 'Bob']

windowing+group_by
之后,聚合数据流对于每个
window/name
与事件的组合都有一行。我怎样才能填充流以返回每个时间窗口中的每个键来填充丢失的数据点,比如 0?

在下面找到一个最小的工作示例,我想对其进行修改,以便在每个窗口中获得一行

Alice
Bob
1s
2s
3s
4s

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)

    # define the source with watermark definition
    ds = env.from_collection(
        collection=[
            (Instant.of_epoch_milli(1000), 'Alice', 110.1),
            (Instant.of_epoch_milli(2000), 'Bob', 53.1),
            (Instant.of_epoch_milli(3000), 'Bob', 3.1),
            (Instant.of_epoch_milli(4000), 'Bob', 30.2),
        ],
        type_info=Types.ROW([Types.INSTANT(), Types.STRING(), Types.FLOAT()]))

    table = t_env.from_data_stream(
        ds,
        Schema.new_builder()
              .column_by_expression("ts", "CAST(f0 AS TIMESTAMP(3))")
              .column("f1", DataTypes.STRING())
              .column("f2", DataTypes.FLOAT())
              .watermark("ts", "ts")
              .build()
    ).alias("ts", "name", "price")

    # define the tumble window operation
    table = (
        table
        .window(Tumble.over(lit(1).seconds).on(col("ts")).alias("w"))
        .group_by(col('name'), col('w'))
        .select(
            col('name'),
            col("w").start.alias('window'),
            col('price').sum.alias('result'),
        )
    )

    # submit for execution
    table.execute().print()
apache-flink flink-streaming pyflink
1个回答
0
投票

解决“空窗口”问题的标准方法是使用一个自定义源,为每个窗口、每个唯一键发出一个假事件,并在适当的时间。然后,在执行

.group_by()
之前,将该流与真实流合并。由于您有一个包含所有唯一键值的表,因此您可以在源中使用它。

对于这样的源有 this 解决方案,尽管它是 Scala 代码。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.