我正在从流表中读取:
df = spark.readStream.option("ignoreChanges", "true").table(hierarchy)
为了简单起见,我们只需要获取列 LEVEL 最高的记录。
最初我只是想计算出最大级别并在 df 上应用过滤器
max_level = df.agg(max(col("LEVEL"))).collect()[0][0]
df = df.filter(col("LEVEL") == max_level)
但我收到此错误:
pyspark.errors.exceptions.captured.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
然后我尝试了一种不同的方法,我将创建另一个数据框并加入它:
max_level = df.agg(max("LEVEL").alias("LEVEL"))
df = df.join(max_level, "LEVEL").select(df.columns)
但我收到此错误:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
所以我尝试了水印:
max_level = df.alias("df").withWatermark("UpdateDate", "10 minutes").groupBy(window(col("df.UpdateDate"), "10 minutes", "5 minutes"),col("df.Database")).agg(max("LEVEL").alias("LEVEL"))
df = df.alias("df").join(max_level.alias("max"), "LEVEL").select(*[col("df." + x) for x in df.columns])
但现在我收到此错误:
NameError: name 'window' is not defined
感觉我已经很接近了,也许只是水印有问题,有人可以帮忙吗?
您尝试添加这个吗?
from pyspark.sql.window import Window
更多信息:
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html