调用rewrite_data_files过程后如何处理Iceberg CommitFailedException?

问题描述 投票:0回答:1

我的应用程序旨在接收流数据并使用 PySparkIceberg 将其插入数据库。短暂的直播后我就跑了

spark.sql("CALL local.system.rewrite_data_files('local.db.table')")

然后,我运行了这个:

timestamp_ms = int(datetime.datetime.now().timestamp() * 1000)
spark.sql(f"CALL local.system.expire_snapshots('local.db.table', {timestamp_ms})")

第一次,效果很好,但第二次尝试时,当我跑时

rewrite_data_files
,我再次收到以下警告:

24/11/04 15:07:12 WARN Tasks: Retrying task after failure: Version 384 already exists: warehouse/db/table/metadata/v384.metadata.json
org.apache.iceberg.exceptions.CommitFailedException: Version 384 already exists: warehouse/db/table/metadata/v384.metadata.json
        at org.apache.iceberg.hadoop.HadoopTableOperations.renameToFinal(HadoopTableOperations.java:369)
        at org.apache.iceberg.hadoop.HadoopTableOperations.commit(HadoopTableOperations.java:162)
        at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:408)
        at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
        at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
        at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
        at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:382)
        at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:233)
        at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:84)
        at org.apache.iceberg.spark.source.SparkWrite$BatchAppend.commit(SparkWrite.java:301)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:399)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:359)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:225)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:337)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:336)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:225)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
        at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:150)
        at jdk.internal.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:1583)
apache-spark pyspark apache-iceberg
1个回答
0
投票

经过一番搜索,我找到了答案。要点是,这只是一个警告,表明正在发生一些并发更新。提交将被重试并最终成功。

© www.soinside.com 2019 - 2024. All rights reserved.