spark_catalog 需要 dbt python 增量模型中的单部分命名空间

问题描述 投票:0回答:1

描述:

  • 使用允许创建 Python 模型的 dbt 功能,我创建了一个模型,该模型从某些 BigQuery 表中读取,执行一些计算并写回 BigQuery。

  • 它使用 dataproc(无服务器提交模式)将模型作为 PySpark 作业提交。

问题

当使用table具体化运行模型时,一切都会按预期进行。但是,当尝试使用 incremental 物化并使用属性

dbt.this
访问当前模型的位置时,代码会中断。

这是错误的代码:

# Processs new rows only
if dbt.is_incremental:
    # only new rows compared to max in current table
    max_from_this = f"select max(created_at) from {dbt.this}"
    df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])

这是错误输出:

df = df.filter(df.created_at >= session.sql(max_from_this).collect()[0][0])
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 1034, in sql
File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
pyspark.sql.utils.AnalysisException: spark_catalog requires a single-part namespace, but got [x, y]

参考资料:

我尝试了一种不同的方法,而不是使用

session.sql
查询表,而是首先使用
session.table
检索表,然后使用返回的数据帧执行简单的计算,但错误仍然存在

python pyspark google-cloud-dataproc dbt
1个回答
0
投票

我想知道您是否找到了解决这个问题的方法?我正在为同样的问题而苦苦挣扎。

© www.soinside.com 2019 - 2024. All rights reserved.