当我创建如上所示的UDF函数时,我收到任务序列化错误。仅当我使用spark-submit
在集群部署模式下运行代码时才会出现此错误。但是,它在火花壳中效果很好。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.mutable.WrappedArray
def mfnURL(arr: WrappedArray[String]): String = {
val filterArr = arr.filterNot(_ == null)
if (filterArr.length == 0)
return null
else {
filterArr.groupBy(identity).maxBy(_._2.size)._1
}
}
val mfnURLUDF = udf(mfnURL _)
def windowSpec = Window.partitionBy("nodeId", "url", "typology")
val result = df.withColumn("count", count("url").over(windowSpec))
.orderBy($"count".desc)
.groupBy("nodeId","typology")
.agg(
first("url"),
mfnURLUDF(collect_list("source_url")),
min("minTimestamp"),
max("maxTimestamp")
)
我试图添加spark.udf.register("mfnURLUDF",mfnURLUDF)
,但它没有解决问题。
您也可以尝试以这种方式创建udf:
val mfnURL = udf { arr: WrappedArray[String] =>
val filterArr = arr.filterNot(_ == null)
if (filterArr.length == 0)
return null
else {
filterArr.groupBy(identity).maxBy(_._2.size)._1
}
}