Spark 2.3.1如何设置结构化流内连接的事件时间范围条件

问题描述 投票:0回答:1

我正在尝试加入流数据集,因为两个数据集都没有时间分量,所以将当前时间戳用于水印。我想在5分钟后清除加入的数据,因为加入将不会再次匹配旧数据。关于带有可选水印的内部联接的文档确实令人困惑。

  1. 在两个输入上定义水印延迟-如下所述

  2. 定义事件时间的约束。这是强制性的以避免无界状态。在这种情况下,我该如何清除它。

val stream1WithWatermark = stream1
                            .withColumn("stream1_processed_time",current_timestamp())
                            .withWatermark("stream1_processed_time","5 minutes")
val stream2WithWatermark = stream2
                            .withColumn("stream2_processed_time",current_timestamp())
                            .withWatermark("stream2_processed_time","5 minutes")
val joinStream = stream1.join(stream2, expr(stream1key = stream2key)).writeStream.....start()

当我使用条件stream1_processed_time >= stream1_processed_time + 5 minutes时,它抛出的错误低于指出2.3.x中的错误的错误。

org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'stream1_processed_time

查看结构化流时遇到的所有问题,结构化流是用于流连接的好主意吗?

apache-spark spark-structured-streaming
1个回答
0
投票
stream1_processed_time >= stream2_processed_time + 5 minutes

stream2_processed_time >= stream1_processed_time + 5 minutes
© www.soinside.com 2019 - 2024. All rights reserved.