升级到spark 3.5.0和delta 3.1.0时Spark-delta不工作

问题描述 投票:0回答:1

我有一个在本地与spark配合使用的docker项目,如下:

  • Ubuntu 20.04(WSL)
  • openjdk:17.0.2
  • 斯卡拉 2.12
  • 火花3.4.0
  • Spark-delta 2.4.0
  • Jupyter实验室

一切正常,但是当我想将spark的版本升级到3.5.0和与3.1.0兼容的spark-delta时,当我想创建或查询增量表时,我收到了这个错误。 创建 Spark 会话的代码:

spark_conf.setAll(
    [
        ("spark.master", "spark://spark-master:7077"),
        ("spark.app.name", "spark_app"]),
        ("spark.driver.memory", "4g"),
        ("spark.submit.deployMode", "client"),
        ("spark.ui.showConsoleProgress", "true"),
        ("spark.eventLog.enabled", "false"),
        ("spark.logConf", "false"),
        (
            "spark.jars",
            "/usr/lib/delta-core_2.12-3.1.0.jar",
        ),
        ("spark.driver.extraJavaOptions", "-Djava.net.useSystemProxies=true"),
        ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
        (
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        ),
        (
            "javax.jdo.option.ConnectionURL",
            f"jdbc:derby:;databaseName=/tmp/metastore_db;create=true",
        ),
        ("spark.sql.catalogImplementation", "hive"),
    ]
)
builder = SparkSession.builder.config(conf=spark_conf)
spark_session = configure_spark_with_delta_pip(builder).getOrCreate()

从增量表中选择数据的代码:

df = spark_session.sql(f"""select * from delta_table;""")
df.show()

错误:

Py4JJavaError: An error occurred while calling o57.sql.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4) (172.20.0.4 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.catalyst.expressions.ScalaUDF.f of type scala.Function1 in instance of org.apache.spark.sql.catalyst.expressions.ScalaUDF
    at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2227)
    at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2191)
    at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1478)
    at java.base/java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2690)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2497)
    at 
snipped ....
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:527)
    at jdk.internal.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at  
snipped ..
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1744)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:514)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:472)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:90)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

这是兼容性问题吗?

apache-spark pyspark delta
1个回答
0
投票
 spark_conf.setAll(
    [(
            "spark.jars",
            "/usr/lib/delta-core_2.12-3.1.0.jar",
        ),

delta 3.0(搜索“Delta Spark”)起,火花罐有所不同。 您可以在那里找到完整的列表。

© www.soinside.com 2019 - 2024. All rights reserved.