DLT 水印名称“窗口”未定义

问题描述 投票:0回答:1

我正在从流表中读取:

df = spark.readStream.option("ignoreChanges", "true").table(hierarchy)

为了简单起见,我们只需要获取列 LEVEL 最高的记录。

最初我只是想计算出最大级别并在 df 上应用过滤器

max_level = df.agg(max(col("LEVEL"))).collect()[0][0]
df = df.filter(col("LEVEL") == max_level)

但我收到此错误:

pyspark.errors.exceptions.captured.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

然后我尝试了一种不同的方法,我将创建另一个数据框并加入它:

max_level = df.agg(max("LEVEL").alias("LEVEL"))
df = df.join(max_level, "LEVEL").select(df.columns)

但我收到此错误:

Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;

所以我尝试了水印:

max_level = df.alias("df").withWatermark("UpdateDate", "10 minutes").groupBy(window(col("df.UpdateDate"), "10 minutes", "5 minutes"),col("df.Database")).agg(max("LEVEL").alias("LEVEL"))
df = df.alias("df").join(max_level.alias("max"), "LEVEL").select(*[col("df." + x) for x in df.columns])

但现在我收到此错误:

NameError: name 'window' is not defined

感觉我已经很接近了,也许只是水印有问题,有人可以帮忙吗?

databricks spark-streaming azure-databricks delta-live-tables dlt
1个回答
0
投票

您尝试添加这个吗?

from pyspark.sql.window import Window

更多信息:

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html

© www.soinside.com 2019 - 2024. All rights reserved.