我有巨大的数据框,其中包含几列,其中之一是callDate(DateType)。我想将该数据帧保存到S3的实木复合地板上,并通过此call_date列对其进行分区。这将是我们项目的初始负载(包含历史数据),然后在生产中,一天结束后,它应该添加新分区,而不删除旧分区。
在我省略.partitionBy方法的情况下,作业在12分钟内完成。动作示例:
allDataDF.write.mode("overwrite").parquet(resultPath)
另一方面,当我这样做时:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
allDataDF.write.mode("overwrite").partitionBy("call_date").parquet(resultPath)
作业在30分钟内未完成。我在partitionBy之前没有进行任何分区,所以我猜速度应该有点相似,因为每个执行者都应该将其自己的分区保存到特定日期?我在这里想念什么?
正如您正确地说的那样,使用partitionBy
,每个任务都将其数据写入所有分区。因此,对于N个任务和P个分区,写入文件系统的files总数可以增加到N x P
。由于默认情况下,Spark驱动程序按顺序提交每个文件,因此按比例花费更长的时间才能完成写入。
对于Spark <2.4尝试使用spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
,看看是否有帮助。
对于Spark> = S3上的2.4在AWS的较新Spark版本中,众所周知,EMRFS S3-optimized committer解决了对S3的缓慢写入。我对此没有经验,但是他们的结果published看起来非常令人鼓舞。