[我正在尝试使用Scala和Spark解析一组XML文件。我从文件中获取了'n'个数据帧的数据。(即,数据帧的数量没有变化,只有文件数量有所变化)
我正在解析一组XML文件,并将数据存储在ListBuffer[ListBuffer[String]]
中。每个ListBuffer[String]
都包含一个数据帧的数据。例如:
ListBuffer[
ListBuffer["1|2|3|4","5|6|7|8"],
ListBuffer["a|b|c|d","e|f|g|h"],
ListBuffer["q|w|e|r","w|x|y|z"]
]
这将创建3个数据框:
Dataframe1:
col1 col2 col3 col4
1 2 3 4
5 6 7 8
以及类似的其他2个数据框。
我无法直接将XML转换为Dataframe,因为在制作Dataframe之前,数据中需要进行很多自定义处理。
我正在使用以下代码将ListBuffer转换为Dataframe:
finalListBuffer.foreach{ data =>
columns = FunctionToReturnColumnsList()
val schema = StructType(columns.map(field => StructField(field, StringType, true)))
val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
val df = sparkSess.createDataFrame(dataRDD, schema)
...
}
在此步骤之后,对每个数据帧执行一些操作((某些操作具有数据帧之间的相关性,因此我不能只处理一个数据帧,然后写入),最后使用以下代码写入数据帧:
df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)
在执行这些步骤时,当输入文件大小很大时,我会遇到两个问题:
1)创建数据帧时,超出了GC开销限制。(创建dataRDD
变量的步骤)
2)写入df时发生火花心跳超时错误。>>
如何解决这些问题?
我最初考虑使用ListBuffer[RDD[String]]
,而不是ListBuffer[ListBuffer[String]]
但是最多可以有100万个文件,每个文件最多可以有10-20个df条目。我正在做的是,我列出所有文件,并逐个处理每个文件,并将其结果附加到主ListBuffer中。因此,如果我使用的是RDD,则必须为每个文件使用union,这可能会很昂贵。还有什么可以做的?
[我正在尝试使用Scala和Spark解析一组XML文件。我从文件中获取了'n'个数据帧的数据。(即,数据帧的数量不变,只有文件数量不同),我正在解析一组XML ...
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer
scala> val lbs = ListBuffer(
| ListBuffer("1|2|3|4","5|6|7|8"),
| ListBuffer("a|b|c|d","e|f|g|h"),
| ListBuffer("q|w|e|r","w|x|y|z")
| )
lbs: scala.collection.mutable.ListBuffer[scala.collection.mutable.ListBuffer[String]] = ListBuffer(ListBuffer(1|2|3|4, 5|6|7|8), ListBuffer(a|b|c|d, e|f|g|h), ListBuffer(q|w|e|r, w|x|y|z))
scala> val schema = StructType(Seq(StructField("c1", StringType, true),StructField("c2", StringType, true),StructField("c3", StringType, true),StructField("c4", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(c1,StringType,true), StructField(c2,StringType,true), StructField(c3,StringType,true), StructField(c4,StringType,true))
scala> var lb_df: ListBuffer[DataFrame] = ListBuffer()
lb_df: scala.collection.mutable.ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()
scala> def createDF(lb: ListBuffer[String]) = spark.createDataFrame(spark.sparkContext.parallelize(lb.toSeq).map(_.toString.split("\\|")).map(Row(_: _*)), schema)
createDF: (lb: scala.collection.mutable.ListBuffer[String])org.apache.spark.sql.DataFrame
scala> lbs.foreach(lb => lb_df.append(createDF(lb)))
scala> lb_df.foreach(_.show())
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| 1| 2| 3| 4|
| 5| 6| 7| 8|
+---+---+---+---+
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| a| b| c| d|
| e| f| g| h|
+---+---+---+---+
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| q| w| e| r|
| w| x| y| z|
+---+---+---+---+