我正在尝试加入流数据集,因为两个数据集都没有时间分量,所以将当前时间戳用于水印。我想在5分钟后清除加入的数据,因为加入将不会再次匹配旧数据。关于带有可选水印的内部联接的文档确实令人困惑。
说
在两个输入上定义水印延迟-如下所述
定义事件时间的约束。这是强制性的以避免无界状态。在这种情况下,我该如何清除它。
val stream1WithWatermark = stream1
.withColumn("stream1_processed_time",current_timestamp())
.withWatermark("stream1_processed_time","5 minutes")
val stream2WithWatermark = stream2
.withColumn("stream2_processed_time",current_timestamp())
.withWatermark("stream2_processed_time","5 minutes")
val joinStream = stream1.join(stream2, expr(stream1key = stream2key)).writeStream.....start()
当我使用条件stream1_processed_time >= stream1_processed_time + 5 minutes
时,它抛出的错误低于指出2.3.x中的错误的错误。
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'stream1_processed_time
查看结构化流时遇到的所有问题,结构化流是用于流连接的好主意吗?
stream1_processed_time >= stream2_processed_time + 5 minutes
或
stream2_processed_time >= stream1_processed_time + 5 minutes