val data = spark.read
.text(filePath)
.toDF("val")
.withColumn("id", monotonically_increasing_id())
val count = data.count()
val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
val columns = header
.replace("H|*|", "")
.replace("|##|", "")
.split("\\|\\*\\|")
val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))
var correctData = data.where('id > 1 && 'id < count-1).select("val")
var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
})
val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)
display(finalDF)
这部分代码给出错误:
线程“ dispatcher-event-loop-0”中的异常java.lang.OutOfMemoryError:Java堆空间
经过数小时的调试主要部分:
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
})
val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)
导致错误。
我将零件更改为
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
}).toList
val finalDF = sqlContext.createDataFrame(sc.makeRDD(dataArr),structSchema)
但是错误仍然相同。我应该怎么做才能避免这种情况?
当我运行此代码是databricks spark集群时,特定的工作导致此Spark驱动程序错误:
作业因阶段故障而中止:序列化任务45:0为792585456字节,超过了允许的最大值:spark.rpc.message.maxSize(268435456字节)。
我添加了这部分代码:
spark.conf.set("spark.rpc.message.maxSize",Int.MaxValue)
但没有用。
我的猜测是
var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
是问题,因为您将(几乎)所有数据收集到驱动程序,即收集到一个单一的JVM。
也许这条线在运行,但是对dataString
的后续操作将超出您的内存限制。您不应该收集您的数据!而是使用分布式的“数据结构”,例如Dataframe或RDD。
我想您可以省略上一行中的collect