如何有效地将ListBuffer [ListBuffer [String]]转换为多个数据帧并使用Spark Scala写入它们

问题描述 投票:1回答:1

[我正在尝试使用Scala和Spark解析一组XML文件。我从文件中获取了'n'个数据帧的数据。(即,数据帧的数量没有变化,只有文件数量有所变化)

我正在解析一组XML文件,并将数据存储在ListBuffer[ListBuffer[String]]中。每个ListBuffer[String]都包含一个数据帧的数据。例如:

ListBuffer[
    ListBuffer["1|2|3|4","5|6|7|8"],
    ListBuffer["a|b|c|d","e|f|g|h"],
    ListBuffer["q|w|e|r","w|x|y|z"]
]

这将创建3个数据框:

Dataframe1:
 col1  col2  col3  col4
   1     2     3     4
   5     6     7     8

以及类似的其他2个数据框。

我无法直接将XML转换为Dataframe,因为在制作Dataframe之前,数据中需要进行很多自定义处理。

我正在使用以下代码将ListBuffer转换为Dataframe:

finalListBuffer.foreach{ data =>

    columns = FunctionToReturnColumnsList()
    val schema = StructType(columns.map(field => StructField(field, StringType, true)))
    val dataRDD: RDD[Row] = sparkSess.sparkContext.parallelize(data.toStream.map(l => Row.fromSeq(l.split("|", -1))))
    val df = sparkSess.createDataFrame(dataRDD, schema)
    ...
}

在此步骤之后,对每个数据帧执行一些操作((某些操作具有数据帧之间的相关性,因此我不能只处理一个数据帧,然后写入),最后使用以下代码写入数据帧:

df.repartition(1).write.mode("Overwrite").option("multiline", "true").option("delimiter", "\u0017").csv(filename)

在执行这些步骤时,当输入文件大小很大时,我会遇到两个问题:

1)创建数据帧时,超出了GC开销限制。(创建dataRDD变量的步骤)

2)写入df时发生火花心跳超时错误。>>

如何解决这些问题?

我最初考虑使用ListBuffer[RDD[String]],而不是ListBuffer[ListBuffer[String]]

但是最多可以有100万个文件,每个文件最多可以有10-20个df条目。我正在做的是,我列出所有文件,并逐个处理每个文件,并将其结果附加到主ListBuffer中。因此,如果我使用的是RDD,则必须为每个文件使用union,这可能会很昂贵。还有什么可以做的?

[我正在尝试使用Scala和Spark解析一组XML文件。我从文件中获取了'n'个数据帧的数据。(即,数据帧的数量不变,只有文件数量不同),我正在解析一组XML ...

scala apache-spark xml-parsing apache-spark-sql scala-collections
1个回答
0
投票
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import scala.collection.mutable.ListBuffer
import scala.collection.mutable.ListBuffer

scala> val lbs = ListBuffer(
     |     ListBuffer("1|2|3|4","5|6|7|8"),
     |     ListBuffer("a|b|c|d","e|f|g|h"),
     |     ListBuffer("q|w|e|r","w|x|y|z")
     | )
lbs: scala.collection.mutable.ListBuffer[scala.collection.mutable.ListBuffer[String]] = ListBuffer(ListBuffer(1|2|3|4, 5|6|7|8), ListBuffer(a|b|c|d, e|f|g|h), ListBuffer(q|w|e|r, w|x|y|z))


scala> val schema = StructType(Seq(StructField("c1", StringType, true),StructField("c2", StringType, true),StructField("c3", StringType, true),StructField("c4", StringType, true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(c1,StringType,true), StructField(c2,StringType,true), StructField(c3,StringType,true), StructField(c4,StringType,true))

scala> var lb_df: ListBuffer[DataFrame] = ListBuffer()
lb_df: scala.collection.mutable.ListBuffer[org.apache.spark.sql.DataFrame] = ListBuffer()

scala> def createDF(lb: ListBuffer[String]) = spark.createDataFrame(spark.sparkContext.parallelize(lb.toSeq).map(_.toString.split("\\|")).map(Row(_: _*)), schema)
createDF: (lb: scala.collection.mutable.ListBuffer[String])org.apache.spark.sql.DataFrame


scala> lbs.foreach(lb => lb_df.append(createDF(lb)))

scala> lb_df.foreach(_.show())
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  1|  2|  3|  4|
|  5|  6|  7|  8|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  b|  c|  d|
|  e|  f|  g|  h|
+---+---+---+---+

+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  q|  w|  e|  r|
|  w|  x|  y|  z|
+---+---+---+---+
© www.soinside.com 2019 - 2024. All rights reserved.