在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢

问题描述 投票:0回答:0

我们有一个输入文件的存储库

<rootpath>\2023\01\*\*Events*.xml
=> 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便事件可以解析,转换为相关的 Dataframe,然后存储到 Delta表。输入文件夹有大约 150,000 个文件,每个文件的平均大小为 2MB。

朗读如下:

spark.readStream
      .format("text")
      .option("wholeText","true")
      .option("maxFilesPerTrigger", maxFilesPerTrigger)
      .load(inputFolder)

转换是使用:

interimDF.writeStream
      .queryName("EventsDataStreaming")
      .foreachBatch(writeInBatch)
      .option("checkpointLocation",checkpointFolder)
      .trigger(Trigger.AvailableNow())
      .outputMode("append")
      .start()
      .awaitTermination()

奇怪的是,读取文件用了将近14个小时!意味着在 Databricks“Spark UI”中,前 14 小时内没有转换活动。所有转换和保存到 Delta 都是在最后 3 小时内完成的。 为了成功完成这项工作,我必须为驱动程序和工作程序节点分配一个具有 60+ GB 更大内存的大型集群。 由于驱动程序和集群节点中的内存较小,作业会因 OutofMemory 错误而中止!

这之后,我又做了一个实验。我通过按日期提供路径将工作分成更小的块。例如,

第一份工作:

<rootpath>\2023\01\01\*Events*.xml

第二份工作:

<rootpath>\2023\01\02\*Events*.xml

这些作业运行得更快(3 到 5 倍)并且还使用更小的内存(每个 14 GB)集群完成。

我想知道

  1. 我们如何避免这种读取文件的初始延迟(1 个月数据 14 小时)
  2. 如何在不拆分为更小的作业和使用更小的内存集群的情况下运行,但速度至少提高 5 倍。

我们使用的是 Spark 3.3.0.

apache-spark databricks spark-streaming spark-structured-streaming
© www.soinside.com 2019 - 2024. All rights reserved.