我们有一个输入文件的存储库
<rootpath>\2023\01\*\*Events*.xml
=> 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便事件可以解析,转换为相关的 Dataframe,然后存储到 Delta表。输入文件夹有大约 150,000 个文件,每个文件的平均大小为 2MB。
朗读如下:
spark.readStream
.format("text")
.option("wholeText","true")
.option("maxFilesPerTrigger", maxFilesPerTrigger)
.load(inputFolder)
转换是使用:
interimDF.writeStream
.queryName("EventsDataStreaming")
.foreachBatch(writeInBatch)
.option("checkpointLocation",checkpointFolder)
.trigger(Trigger.AvailableNow())
.outputMode("append")
.start()
.awaitTermination()
奇怪的是,读取文件用了将近14个小时!意味着在 Databricks“Spark UI”中,前 14 小时内没有转换活动。所有转换和保存到 Delta 都是在最后 3 小时内完成的。 为了成功完成这项工作,我必须为驱动程序和工作程序节点分配一个具有 60+ GB 更大内存的大型集群。 由于驱动程序和集群节点中的内存较小,作业会因 OutofMemory 错误而中止!
这之后,我又做了一个实验。我通过按日期提供路径将工作分成更小的块。例如,
第一份工作:
<rootpath>\2023\01\01\*Events*.xml
第二份工作:
<rootpath>\2023\01\02\*Events*.xml
这些作业运行得更快(3 到 5 倍)并且还使用更小的内存(每个 14 GB)集群完成。
我想知道
我们使用的是 Spark 3.3.0.