问题很简单,当你使用TUMBLING window和append mode,然后窗口关闭仅当下一条消息到达(+水印逻辑)。
在当前的实现中,如果你停止传入流数据,最后一个窗口将永远不会关闭,我们将丢失最后一个窗口数据。
如果新数据停止传入,我们如何强制最后一个窗口关闭 lush?
经营情况:
工作正常,新消息停止传入,下一条消息在 5 小时后到达,客户端将在 5 小时后收到消息,而不是窗口延迟 10 秒。
Spark v3.3.2 问题代码:
kafka_stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BROKER) \
.option("subscribe", KAFKA_TOPIC) \
.option("includeHeaders", "true") \
.load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value").cast("string"), json_schema).alias("data"))
.select("data.*")
.withWatermark("dt", "1 seconds")
.groupBy(window("dt", "10 seconds"))
.agg(sum("price"))
)
console = sel \
.writeStream \
.trigger(processingTime='10 seconds') \
.format("console") \
.outputMode("append")\
.start()
我找不到带水印的解决方案,但解决此问题的一种方法是删除水印并应用过滤器。这样你就可以通过过滤器接近水印行为并且没有问题。
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(from_json(col("value").cast("string"), json_schema).alias("data"))
.select("data.*")
.filter("dt" > DateTime.now - Interval("5 minutes") && "dt" < DateTime.now + Interval("5 minutes"))
.groupBy(window("dt", "10 seconds"))
.agg(sum("price"))
)