我需要对我的冰山表执行合并操作。我在 Aws emr 设置上使用 jupyter 笔记本。
Spark:“3.4.1-amzn-2”Hadoop:3.3.6 Hive:3.1.3 EMR 版本:6.15.0 Scala:“版本 2.12.15” 我假设基于此的iceberg版本将是1.4.0(https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-iceberg.html)
我正在按照此文档来设置我的 Spark 配置:
https://iceberg.apache.org/docs/1.4.0/spark-getting-started/#using-iceberg-in-spark-3
这些是细胞:
细胞1
%%configure -f
{
"conf": {
"spark.jars.packages": "com.microsoft.azure:spark-mssql-connector_2.12:1.2.0",
"spark.jars": "/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar",
"spark.submit.pyFiles": "s3://dev-datalake-us-west-2-apps/ci-artifacts/utility_files/utils.py,s3://dev-datalake-us-west-2-apps/ci-artifacts/utility_files/eventlogs_utils.py,s3://dev-datalake-us-west-2-apps/ci-artifacts/utility_files/flowlogs_utils.py"
}
}
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, unix_timestamp
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
import utils as u
from datetime import datetime, timedelta
spark = SparkSession.builder \
.appName("spark job") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4.1_2.12.15:1.4.0") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "hdfs:///tmp") \
.config("spark.hadoop.hive.cli.print.header", "true") \
.config("spark.sql.caseSensitive", "true") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.config("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY") \
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
.enableHiveSupport() \
.getOrCreate()
细胞2
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
schema = StructType([
StructField("cid", StringType(), True),
StructField("fznum", StringType(), True),
StructField("satid", StringType(), True),
StructField("t_str", StringType(), True),
StructField("sid", StringType(), True),
StructField("fzid", StringType(), True),
StructField("measure_name", StringType(), True),
StructField("time", TimestampType(), True)
])
final_df_to_load = spark.createDataFrame([], schema)
empty_final_df_to_load = final_df_to_load.limit(0)
catalog_name = "local"
database = "default"
table = "iceberg_table"
empty_final_df_to_load.writeTo(
"{}.{}.{}".format(
catalog_name,
database.replace("`", ""),
table.replace("`", ""),
)
).using(
"iceberg"
).partitionedBy(
"cid",
).tableProperty(
"location", f'hdfs:///tmp/{database}/{table}',
).tableProperty(
"write.spark.fanout.enabled", "false",
).tableProperty(
"write.distribution-mode", "none",
).tableProperty(
"write.delete.mode", "copy-on-write",
).tableProperty(
"write.update.mode", "copy-on-write",
).tableProperty(
"write.merge.mode", "copy-on-write",
).createOrReplace()
4 号细胞
# skipping the timestream_recs_df code to keep it simple
timestream_recs_df.createOrReplaceTempView("records_data_view")
upsert_query = f"""
MERGE INTO `{catalog_name}`.`{database}`.`{table}` t
USING (SELECT * FROM records_data_view) u
ON t.cid = u.cid
AND t.sid = u.sid
AND t.satid = u.satid
AND t.t_str = u.t_str
AND t.fznum = u.fznum
AND t.fzid = u.fzid
AND t.time = u.time
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
"""
spark.sql(upsert_query)
错误:
An error was encountered:
An error occurred while calling o88.sql.
: org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1110)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:1005)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:514)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Traceback (most recent call last):
File "/mnt/yarn/usercache/livy/appcache/application_1731377445592_0007/container_1731377445592_0007_01_000001/pyspark.zip/pyspark/sql/session.py", line 1440, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery, litArgs), self)
File "/mnt/yarn/usercache/livy/appcache/application_1731377445592_0007/container_1731377445592_0007_01_000001/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1323, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/mnt/yarn/usercache/livy/appcache/application_1731377445592_0007/container_1731377445592_0007_01_000001/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
return f(*a, **kw)
File "/mnt/yarn/usercache/livy/appcache/application_1731377445592_0007/container_1731377445592_0007_01_000001/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o88.sql.
: org.apache.spark.SparkUnsupportedOperationException: MERGE INTO TABLE is not supported temporarily.
at org.apache.spark.sql.errors.QueryExecutionErrors$.ddlUnsupportedTemporarilyError(QueryExecutionErrors.scala:1110)
at org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:1005)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:73)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:514)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:171)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:171)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:164)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:184)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:231)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:570)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:231)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:230)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:181)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:177)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans$5(QueryExecution.scala:316)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:692)
at org.apache.spark.sql.execution.QueryExecution.writePlans(QueryExecution.scala:316)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:331)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:285)
at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:264)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:160)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:250)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:271)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:159)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:554)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:107)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:554)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:530)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:97)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:82)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:221)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:98)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
请建议需要做什么。
你找到解决办法了吗?我在使用 dbt-glue 时也遇到同样的问题。谢谢