'线程“ dispatcher-event-loop-0”中的异常java.lang.OutOfMemoryError:Spark Scala代码中的Java堆空间'错误

问题描述 投票:0回答:1
val data = spark.read
    .text(filePath)
    .toDF("val")
    .withColumn("id", monotonically_increasing_id())



    val count = data.count()



    val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)



    val columns = header
    .replace("H|*|", "")
    .replace("|##|", "")
    .split("\\|\\*\\|")


    val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))



    var correctData = data.where('id > 1 && 'id < count-1).select("val")
    var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
    var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

    display(finalDF)

这部分代码给出错误:

线程“ dispatcher-event-loop-0”中的异常java.lang.OutOfMemoryError:Java堆空间

经过数小时的调试主要部分:

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

导致错误。

我将零件更改为

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         }).toList
  val finalDF = sqlContext.createDataFrame(sc.makeRDD(dataArr),structSchema)

但是错误仍然相同。我应该怎么做才能避免这种情况?

当我运行此代码是databricks spark集群时,特定的工作导致此Spark驱动程序错误:

作业因阶段故障而中止:序列化任务45:0为792585456字节,超过了允许的最大值:spark.rpc.message.maxSize(268435456字节)。

我添加了这部分代码:

spark.conf.set("spark.rpc.message.maxSize",Int.MaxValue)

但没有用。

java scala apache-spark apache-spark-sql out-of-memory
1个回答
0
投票

我的猜测是

var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")

是问题,因为您将(几乎)所有数据收集到驱动程序,即收集到一个单一的JVM。

也许这条线在运行,但是对dataString的后续操作将超出您的内存限制。您不应该收集您的数据!而是使用分布式的“数据结构”,例如Dataframe或RDD。

我想您可以省略上一行中的collect

© www.soinside.com 2019 - 2024. All rights reserved.