将Spark DataFrame保存到按日期划分的Parquet中

问题描述 投票:1回答:1

我有巨大的数据框,其中包含几列,其中之一是callDate(DateType)。我想将该数据帧保存到S3的实木复合地板上,并通过此call_date列对其进行分区。这将是我们项目的初始负载(包含历史数据),然后在生产中,一天结束后,它应该添加新分区,而不删除旧分区。

在我省略.partitionBy方法的情况下,作业在12分钟内完成。动作示例:

allDataDF.write.mode("overwrite").parquet(resultPath)

另一方面,当我这样做时:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
allDataDF.write.mode("overwrite").partitionBy("call_date").parquet(resultPath)

作业在30分钟内未完成。我在partitionBy之前没有进行任何分区,所以我猜速度应该有点相似,因为每个执行者都应该将其自己的分区保存到特定日期?我在这里想念什么?

apache-spark apache-spark-sql parquet aws-glue
1个回答
0
投票

正如您正确地说的那样,使用partitionBy,每个任务都将其数据写入所有分区。因此,对于N个任务和P个分区,写入文件系统的files总数可以增加到N x P。由于默认情况下,Spark驱动程序按顺序提交每个文件,因此按比例花费更长的时间才能完成写入。

对于Spark <2.4尝试使用spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2,看看是否有帮助。

对于Spark> = S3上的2.4在AWS的较新Spark版本中,众所周知,EMRFS S3-optimized committer解决了对S3的缓慢写入。我对此没有经验,但是他们的结果published看起来非常令人鼓舞。

© www.soinside.com 2019 - 2024. All rights reserved.