使用允许创建 Python 模型的 dbt 功能,我创建了一个模型,该模型从某些 BigQuery 表中读取,执行一些计算并写回 BigQuery。
它使用 dataproc(无服务器提交模式)将模型作为 PySpark 作业提交。
当使用table具体化运行模型时,一切都会按预期进行。但是,当尝试使用 incremental 物化并使用属性
dbt.this
访问当前模型的位置时,代码会中断。
这是错误的代码:
# Processs new rows only
if dbt.is_incremental:
# only new rows compared to max in current table
max_from_this = f"select max(created_at) from {dbt.this}"
df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])
这是错误输出:
df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.AnalysisException: spark_catalog requires a single-part namespace, but got [x, y]
我尝试了一种不同的方法,而不是使用
session.sql
查询表,而是首先使用 session.table
检索表,然后使用返回的数据帧执行简单的计算,但错误仍然存在
我想知道您是否找到了解决这个问题的方法?我正在为同样的问题而苦苦挣扎。