使用spark spark mapPartition时出错[重复]

问题描述 投票:0回答:1

这个问题在这里已有答案:

所以我有这个代码

val expanededDf = io.readInputs().mapPartitions{ 
(iter:Iterator[Row]) => {
    iter.map{
        (item:Row) => {
            val myNewColumn = getUdf($"someColumnOriginal")
            Row.fromSeq(item.toSeq :+(myNewColumn))
            }
     }
 }
 }

我得到一个例外:“无法找到存储在数据集中的类型的编码器。导入spark.implicits.支持原始类型(Int,String等)和产品类型(案例类)。支持序列化其他类型将是在未来的版本中添加。“我的进口是:

import spark.implicits._
import org.apache.spark.sql._

我必须使用UDF,因为函数非常复杂,可以进行一些REST调用。基本上,代码尝试使用特定列值将新列添加到Row中,然后返回数据帧。我曾尝试使用withColumn,但由于我在这里处理数PB的数据,因此速度非常慢。我是火花和斯卡拉的新手,因此如果我的问题非常蹩脚,我会提前道歉。

scala apache-spark rdd user-defined-functions
1个回答
1
投票

首先,withColumn是要走的路,如果它很慢,可能是因为你的工作需要调整,我认为切换到RDD不会让它变得更快。

但无论如何......你不应该引用在RDD的每一行上调用的函数中的DataFrame。

为了更好地理解正在发生的事情,当运行一个火花程序时,有一个驱动程序,它是主程序,还有执行程序,它们是奴隶。奴隶不知道DataFrames,只有驱动程序。

还有一点很重要,当您编写在执行程序中运行的代码时,在引用驱动程序范围内的变量时必须小心。如果你这样做,Spark将尝试序列化它们并将它们发送给Executors。如果它是您想要的并且如果这些对象很小并且Spark知道如何序列化它们就没关系。

在这种情况下,Spark正在尝试序列化$"someColumnOriginal",这是类Column的对象,但它不知道如何失败。在这种情况下,为了使它工作,你必须知道你想要的字段在什么位置,让我们说它在第2位,你会写

 Row.fromSeq(item.toSeq :+ item.get(2))

您可以通过查看架构(如果它可用)(item.schema,rdd.schema)获得该位置,并且因为它是一个int,它可以在循环外完成,Spark将能够序列化它。您可以阅读本文http://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error以获取有关序列化的更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.