Spark 最后一个窗口在追加模式下不刷新

问题描述 投票:0回答:1

问题很简单,当你使用TUMBLING windowappend mode,然后窗口关闭仅当下一条消息到达(+水印逻辑)。

在当前的实现中,如果你停止传入流数据,最后一个窗口将永远不会关闭,我们将丢失最后一个窗口数据。

如果新数据停止传入,我们如何强制最后一个窗口关闭 lush?

经营情况:

工作正常,新消息停止传入,下一条消息在 5 小时后到达,客户端将在 5 小时后收到消息,而不是窗口延迟 10 秒。

Spark v3.3.2 问题代码:

kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .select(from_json(col("value").cast("string"), json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )




console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
apache-spark pyspark apache-spark-sql spark-streaming spark-structured-streaming
1个回答
0
投票

我找不到带水印的解决方案,但解决此问题的一种方法是删除水印并应用过滤器。这样你就可以通过过滤器接近水印行为并且没有问题。

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .select(from_json(col("value").cast("string"), json_schema).alias("data"))
       .select("data.*")
       .filter("dt" > DateTime.now - Interval("5 minutes") && "dt" < DateTime.now + Interval("5 minutes"))
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.