我有一个 Spark 作业,其中某些任务的记录输出为零,并且随机播放读取大小,其中某些任务有内存和磁盘溢出。有人可以帮我做什么来优化执行吗? 执行信息:repartition_cnt=3500 [数据集位于 S3 中,通过具有 298 个 DPU 的 Glue G2X 执行)
代码:
fct_ate_df.repartition(expr(s"pmod(hash(mae_id, rowsin, dep), $repartition_cnt)"))
.write
.mode("overwrite")
.format("parquet")
.bucketBy(repartition_cnt, "rowsin", "rowsin","dep")
.sortBy("rowsin","dep")
.option("path", s"s3://b222-id/data22te=$dat22et_date")
.saveAsTable(s"btemp.intte_${table_name}_${regd}")
泄漏记录
您正在使用表达式修复,我认为这就是您看到那些空分区的原因。在这种情况下,spark 在内部将使用 HashPartitioner,并且该partinioner 不保证分区相等。
由于哈希算法,您可以确定具有相同表达式值的记录将位于同一分区中,但最终可能会得到空分区或内部有例如 5 个键的分区。
在这种情况下,numPartitions 不会改变任何东西,如果一个存储桶中有很多键(因此稍后分区),最终生成的分区少于 numPartition Spark 将生成空分区,如您在示例中看到的
我认为,如果你想拥有相等的分区,你可以删除计算哈希值的表达式,只留下 $repartition_cnt
感谢 Spark 将使用 RoundRobinPartitioner 来代替,并且这个将生成 equals 分区
如果你想挖掘北斗七星,你可以看一下源代码,我认为这里是很好的起点
在这里你可以找到与重新分区相关的逻辑,无需表达:Spark源代码
在这里您可以找到用于按表达式分区的逻辑:Spark源代码