我正在使用spark sql对我的数据集运行查询。查询的结果非常小但仍然是分区的。
我想合并生成的DataFrame并按列排序行。我试过了
DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")
我也试过了
DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")
输出文件以块的形式排序(即分区是有序的,但数据帧不作为整体排序)。例如,而不是
1, value
2, value
4, value
4, value
5, value
5, value
...
我明白了
2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
我想在这里提几件事。 1-源代码显示orderBy语句在内部调用排序api,全局排序设置为true。因此,输出级别缺乏排序表明在写入目标时排序丢失。我的观点是,对orderBy的调用始终需要全局订单。
2-使用剧烈的聚结,如在你的情况下强制单个分区,可能是非常危险的。我建议你不要这样做。源代码表明调用coalesce(1)可能会导致上游转换使用单个分区。这将是残酷的表现。
3-您似乎希望orderBy语句可以使用单个分区执行。我不认为我同意这一说法。这将使Spark成为一个非常愚蠢的分布式框架。
如果您同意或不同意声明,请告知我们。
你是如何从输出中收集数据的呢?
也许输出实际上包含已排序的数据,但您为了从输出中读取而执行的转换/操作是导致订单丢失的原因。
orderBy将在合并后生成新分区。要拥有单个输出分区,请重新排序操作...
DataFrame result = spark.sql("my sql").orderBy("col1").coalesce(1)
result.write.json("results.json")
正如@JavaPlanet所提到的,对于非常大的数据,您不希望合并到单个分区中。它将大大降低您的并行度。