为什么UDF查询失败并出现“Task not serializable”异常?

问题描述 投票:-1回答:2

我已经创建了一个UDF,我试图将它应用于连接中的coalesce结果。理想情况下,我想在加入期间执行此操作:

def foo(value: Double): Double = {
    value / 100
}

val foo = udf(foo _)

df.join(.....)
  .withColumn("value",foo(coalesce(new Column("valueA"), new Column("valueB"))))

但我得到了例外Task not serializable。有办法解决这个问题吗?

scala apache-spark serialization apache-spark-sql
2个回答
1
投票

使用lambda函数使其可序列化。这个例子工作正常。

    import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.coalesce
import org.apache.spark.sql.functions.udf
val central: DataFrame = Seq(
  (1,  Some(2014)),
  (2,  null)
).toDF("key", "year1")

val other1: DataFrame = Seq(
  (1,  2016),
  (2,  2015)
).toDF("key", "year2")
def fooUDF = udf{v: Double => v/100}

val result = central.join(other1, Seq("key"))
  .withColumn("value",fooUDF(coalesce(col("year1"), col("year2"))))

0
投票

但我得到了例外Task not serializable

臭名昭着的“任务不可序列化”例外的原因是def foo(value: Double): Double是一个不可序列化的拥有对象的一部分(可能与SparkSession间接引用一个不可序列化的SparkContext)。

解决方案是将方法定义为“独立”对象的一部分,该对象不引用不可序列化的值。

有办法解决这个问题吗?

请参阅@firas的other answer

© www.soinside.com 2019 - 2024. All rights reserved.