我使用sql spark来读取和写入hdfs中的文件,使用以下代码:
val df = spark.read.parquet("D:/resources/input/address/year_month_day=2018-07-02")
val df.write.mode("overwrite").parquet("D:/resources/output/t_kcdo_person")
写作的结果是许多小files。根据我的学习,不建议文件的大小小于128 MB。我一直在寻找方法来减少文件数量但是尺寸更大,我找到了函数df.coalesce,但我有一个问题是建议使用这个函数,因为它会避免并行性。
这是Spark中的一个已知问题。无论数据的实际大小如何,每个分区都将输出一个文件。 coalesce
也不是银弹 - 你需要非常小心新的分区数量 - 太小而应用程序将是OOM。您可以考虑在运行时计算合并值,但在大多数情况下,这意味着将数据保存到磁盘,获取实际大小,然后再次读取并将coalesce
调整到最佳大小。
在您的简单示例中,您可以预先获得实际输入大小。但是对于一般情况,有像FileCrush这样的工具可以对你的输出(小文件)进行操作,并将它们合并到更少的文件中。但它现在已经老了,并且基于map-reduce(虽然仍然有效)。在我的团队中,我们构建了一个非常简单的Spark版本,您可以轻松创建自己的版本。如果这样做,请记住在计算最佳分区数时考虑comperssion编解码器。
另外,在使用coalesce
时,你对于担心并行性是正确的。如果这成为一个问题,并且你有一些计算应该在coalesce
之前运行更高级别的并行性,你可以使用类似spark.createDataFrame(df.rdd, df.schema)
的东西来创建一个新的数据帧,并避免将coalesce
推得太低。但是,这需要考虑重要的意义。