我是 Spark 和 Spark SQL 的新手。我有一个包含 2 列的数据集,“col1”和“col2”,“col2”最初是一个长整型序列。我想将“col2”分解为多行,以便每一行只有一个长。
我尝试了爆炸功能与使用
flatMap
和我自己的映射器功能。他们似乎有显着的性能差异。其他一切保持不变,“爆炸”功能似乎比flatMap
慢得多(数量级取决于数据大小)。为什么?
选项1:使用“爆炸”
val exploded = data.withColumn("col2", explode(col("col2")))
选项2:使用手动平面地图
case class MyPair(col1: Long, col2: Long)
def longAndLongArrayMapper(colToKeep: Long, colToExplode: Seq[Long]) = {
(for (val <- colToExplode) yield MyPair(val, colToKeep))
}
val exploded = data.flatMap{ (x: Row) =>
longAndLongArrayMapper(x.getAs[Long]("col1"), (x.getAs[Seq[Long]]("col2"))) }
explode
函数比使用
flatMap
函数分割数组慢的原因归结为这些函数的实现方式以及它们处理数据的方式。让我们来分解一下关键因素:1.
explode
:
explode
函数是Spark SQL中的DataFrame API函数,它工作在更高的抽象级别。当您调用
explode
时,Spark 会在内部将其转换为一系列转换。这些转换可能涉及 Spark SQL 查询执行中的多个阶段,其中可能包括优化、逻辑规划和物理执行计划。这些操作的开销使得
explode
变慢,尤其是对于大型数据集。
flatMap
:
flatMap
是 Spark 中 RDD API 的较低级别操作。它直接对 RDD 中的每个元素应用转换,并可以立即将数组拆分为单独的元素。它更高效,因为它跳过了与 DataFrame API 相关的许多优化和开销成本,并在较低级别执行。
explode
:由于它是 DataFrame API 的一部分(更加用户友好且具有声明性),因此
explode
通常会带来来自 Catalyst 优化器的额外开销。 Catalyst 分析查询、应用优化并生成执行计划。虽然这种优化可以提高复杂查询的性能,但它也会为更简单的任务(例如分解数组)带来额外的开销。
flatMap
:作为 RDD API 的一部分,
flatMap
直接在 RDD 上运行,无需额外的优化层。这使得拆分数组等更简单的操作更加高效,因为它更接近底层执行引擎,并且避免了额外的规划和查询优化步骤。
explode
:Spark DataFrame 操作(如
explode
)是延迟评估的。 DataFrame API 在调用操作(例如
collect
或
show
)时触发计算,但在此之前,它会花时间构建逻辑和物理执行计划。此过程可能会导致额外的延迟。
flatMap
:
flatMap
也是延迟计算的,但RDD API的较低级别执行通常会导致执行计划中的转换更快,中间步骤更少。 RDD API 没有经历那么多的优化阶段。
explode
:Spark中的DataFrame API采用了更加优化的内存管理机制,但往往需要对复杂结构进行序列化和反序列化。这可能会导致性能下降,特别是在处理嵌套数组或复杂类型时。
flatMap
:RDD API 在更简单的数据结构上运行,并且通常具有较少的序列化开销,因此在您只需将元素从一种形式转换为另一种形式(例如展平数组)时速度更快。
explode
:
explode
通常在您使用类似 SQL 的操作并需要维护架构信息时使用,或者在执行需要 Spark Catalyst 引擎优化的复杂转换时使用。它在更复杂的工作流程中很有用,但会在更简单的工作流程中引入延迟。
flatMap
:
flatMap
在处理不需要模式管理或 DataFrame 操作的转换以及想要高效执行简单操作时更有用。
explode
vs
flatMap
:
explode
。
flatMap
,特别是在不需要模式管理和复杂查询规划的场景中。