在Spark中使用UDF时出现任务序列化错误

问题描述 投票:1回答:1

当我创建如上所示的UDF函数时,我收到任务序列化错误。仅当我使用spark-submit在集群部署模式下运行代码时才会出现此错误。但是,它在火花壳中效果很好。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray

def mfnURL(arr: WrappedArray[String]): String = {
  val filterArr = arr.filterNot(_ == null)
  if (filterArr.length == 0)
    return null
  else {
    filterArr.groupBy(identity).maxBy(_._2.size)._1
  }
}

val mfnURLUDF = udf(mfnURL _)

def windowSpec = Window.partitionBy("nodeId", "url", "typology")                                                     
val result = df.withColumn("count", count("url").over(windowSpec))
  .orderBy($"count".desc)                                                                                            
  .groupBy("nodeId","typology")                                                                                      
  .agg(
  first("url"),
  mfnURLUDF(collect_list("source_url")),
  min("minTimestamp"),
  max("maxTimestamp")
)

我试图添加spark.udf.register("mfnURLUDF",mfnURLUDF),但它没有解决问题。

scala apache-spark apache-spark-sql user-defined-functions
1个回答
2
投票

您也可以尝试以这种方式创建udf:

val mfnURL = udf { arr: WrappedArray[String] =>
  val filterArr = arr.filterNot(_ == null)
  if (filterArr.length == 0)
    return null
  else {
    filterArr.groupBy(identity).maxBy(_._2.size)._1
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.