从流媒体源消费时,
.group_by(col('name'))
)(参见此处)name
值的表格(在下面的示例中['Alice', 'Bob']
)在
windowing+group_by
之后,聚合数据流对于每个 window/name
与事件的组合都有一行。我怎样才能填充流以返回每个时间窗口中的每个键来填充丢失的数据点,比如 0?
在下面找到一个最小的工作示例,我想对其进行修改,以便在每个窗口中获得一行
Alice
和 Bob
1s
、2s
、3s
、4s
。
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# define the source with watermark definition
ds = env.from_collection(
collection=[
(Instant.of_epoch_milli(1000), 'Alice', 110.1),
(Instant.of_epoch_milli(2000), 'Bob', 53.1),
(Instant.of_epoch_milli(3000), 'Bob', 3.1),
(Instant.of_epoch_milli(4000), 'Bob', 30.2),
],
type_info=Types.ROW([Types.INSTANT(), Types.STRING(), Types.FLOAT()]))
table = t_env.from_data_stream(
ds,
Schema.new_builder()
.column_by_expression("ts", "CAST(f0 AS TIMESTAMP(3))")
.column("f1", DataTypes.STRING())
.column("f2", DataTypes.FLOAT())
.watermark("ts", "ts")
.build()
).alias("ts", "name", "price")
# define the tumble window operation
table = (
table
.window(Tumble.over(lit(1).seconds).on(col("ts")).alias("w"))
.group_by(col('name'), col('w'))
.select(
col('name'),
col("w").start.alias('window'),
col('price').sum.alias('result'),
)
)
# submit for execution
table.execute().print()
解决“空窗口”问题的标准方法是使用一个自定义源,为每个窗口、每个唯一键发出一个假事件,并在适当的时间。然后,在执行
.group_by()
之前,将该流与真实流合并。由于您有一个包含所有唯一键值的表,因此您可以在源中使用它。
对于这样的源有 this 解决方案,尽管它是 Scala 代码。