我们正在尝试使用 AWS EMR(在 EC2 上)、DBT、Spark 和 Nessie 设置环境。
即使所有扩展都已正确安装,并且像“CREATE BRANCH”这样的 Nessie 命令可以从 Jupyter 和直接从 Spark 在集群上工作,但它们不能作为 DBT 的一部分工作。
常规 SQL 命令可以工作,并按预期返回响应,但在尝试创建分支或使用分支时,我收到解析错误。
我正在使用尽可能最新的版本
这是堆栈跟踪:
ERROR SparkExecuteStatementOperation: Error executing query with a1aab108-eaaa-4c48-9951-5959ca24a038, currentState RUNNING,
org.apache.spark.sql.catalyst.parser.ParseException:
Syntax error at or near 'demo_branch2'(line 3, pos 22)
== SQL ==
/* {"app": "dbt", "dbt_version": "1.5.2", "profile_name": "thrift_tests", "target_name": "dev", "node_id": "model.thrift_tests.my_first_dbt_model"} */
use reference demo_branch2 in dev_catalog
----------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:143) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:89) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser.parsePlan(IcebergSparkSqlExtensionsParser.scala:133) ~[iceberg-spark-runtime-3.3_2.12-1.1.0-amzn-0.jar:?]
at org.apache.spark.sql.catalyst.parser.extensions.NessieSparkSqlExtensionsParser.parsePlan(NessieSparkSqlExtensionsParser.scala:114) ~[org.projectnessie.nessie-integrations_nessie-spark-extensions-3.3_2.12-0.51.1.jar:?]
at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) ~[spark-catalyst_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651) ~[spark-sql_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute(SparkExecuteStatementOperation.scala:291) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.$anonfun$run$2(SparkExecuteStatementOperation.scala:230) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties(SparkOperation.scala:79) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.hive.thriftserver.SparkOperation.withLocalProperties$(SparkOperation.scala:63) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation.withLocalProperties(SparkExecuteStatementOperation.scala:43) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:230) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2$$anon$3.run(SparkExecuteStatementOperation.scala:225) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_372]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_372]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) ~[hadoop-client-api-3.3.3-amzn-2.jar:?]
at org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$2.run(SparkExecuteStatementOperation.scala:239) ~[spark-hive-thriftserver_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_372]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]
EMR(6.10)配置:
[
{
"Classification": "iceberg-defaults",
"Properties": {
"iceberg.enabled": "true"
}
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.63.0",
"spark.sql.catalog.dev_catalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dev_catalog.auth_type": "NONE",
"spark.sql.catalog.dev_catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
"spark.sql.catalog.dev_catalog.ref": "main",
"spark.sql.catalog.dev_catalog.uri": "https://nessie-dev.dev.XYZ.cloud/api/v1",
"spark.sql.catalog.dev_catalog.warehouse": "s3://.../nessie_catalog/",
"spark.sql.defaultCatalog": "dev_catalog",
"spark.sql.execution.pyarrow.enabled": "true",
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions"
}
}
]
我尝试运行与 Nessie 无关的命令,并且成功了
我尝试为 Nessie 运行 Spark 命令,它们成功了。
Jupyter 一切都按预期运行
我尝试更改 thrift 服务器的配置以不进行验证(使用 GPT,我自己无法发现这可能是 GPT 胡编乱造的)
我尝试使用 DBT 运行 Iceberg 命令,它有效。
请注意,发送到 Spark 的 SQL 发送时带有以下注释的前缀:
/* {"app": "dbt", "dbt_version": "1.5.2", "profile_name": "thrift_tests", "target_name": "dev", "node_id": "model.thrift_tests.my_first_dbt_model"} */
虽然当最终的 SQL 是标准的时候,这个注释不会造成问题,但是当它与 nessie 命令(如
USE REFERENCE
、LIST REFERENCES
等)一起使用时,它确实会造成问题。
为了解决这个问题,我们必须重写 dbt.adapters.spark.session 中的执行函数并删除注释。
我们是这样做的:
original_execute = Cursor.execute
def execute(self, sql: str, *parameters: Any) -> None:
try:
sql = sql[sql.find("*/") + 3:]
original_execute(self, sql, *parameters)
except AnalysisException as exc:
raise DbtRuntimeError(str(exc)) from exc
Cursor.execute = execute
在上面的覆盖之后,我们从代码中运行 DBT 并且成功。 AnalysisException 包装与 dbt-spark 存储库中列出的另一个错误相关。
正如@user502490建议的那样,问题源于在每个查询开头插入的查询注释。另一种解决方案是您可以将 dot_project.yml
中的
query-comment字段设置为
null
。这将阻止 DBT 将元数据注释添加到所有查询中。
# dbt_project.yml
...
query-comment: null
...