我正在从s3读取csv文件并将其作为orc写入hive表。在写作时,它写的是很多小文件。我需要合并所有这些文件。我有以下属性集:
spark.sql("SET hive.merge.sparkfiles = true")
spark.sql("SET hive.merge.mapredfiles = true")
spark.sql("SET hive.merge.mapfiles = true")
spark.sql("set hive.merge.smallfiles.avgsize = 128000000")
spark.sql("set hive.merge.size.per.task = 128000000")
除了这些配置,我尝试重新分区(1)和coalesce(1),它将合并到单个文件中,但它删除了hive表并再次创建它。
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);
如果我使用Append模式而不是Overwrite,它会在每个分区下创建复制文件。
masterFile.repartition(1).write.mode(SaveMode.Append).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);
在这两种情况下,spark作业都会运行两次而在第二次执行时会失败
有没有什么方法可以使用Append模式重新分区/合并而不会在每个分区中重复部分文件?
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>)
.orc()方法将数据写为文件而不是触摸元信息。所以它无法覆盖HIVE中的表格。
如果您想覆盖hive表中的数据使用方法.insertInto(hive_table_name),其中hive_table_name是HIVE中表的全名(schema + table_name)
根据你的例子
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).insertInto(hiveTableName)
也可以用元数据信息覆盖数据。具有覆盖修饰符的方法.saveAsTable(hive_table_name)也将覆盖Metastore中的数据。
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).saveAsTable(hiveTableName)