作为 Databricks 笔记本的一部分,我们正在尝试运行 sql,将大约 15 个增量表与 1 个事实和大约 14 个维度表连接起来。 Joins 产生的数据约为 2 亿条记录。
当我们在做的时候 df = Spark.sql('.....') --> 这个sql是大约15个Delta表的连接 df.count() 它运行了大约 5 到 7 小时,并给出错误消息: org.apache.spark.SparkException:作业因阶段失败而中止:ShuffleMapStage 1504(LexicalThreadLocal.scala:63 处的 $anonfun$withThreadLocalCaptured$5)已失败最大允许次数:4
我们已经跑了 优化 Dim1 ZORDER BY (Dim1SK); 分析表 Dim1 计算所有列的统计数据; 对所有 15 个增量表完成上述操作并运行 sql。
有哪些更好的方法可以解决这个问题。
尝试通过将 Fact 和 Dim1、Dim2、Dim3、Dim4 连接到 DataFrame 中来分成多个 sql 然后将此 DataFrame 与 Dim5、6、7、8 等连接。 即使这也需要更多时间。
广播连接,也称为映射端连接,是一种跨集群节点分布连接操作的连接执行策略。 将大型表(事实表)与相对较小的表(维度表)连接起来非常高效。 在这种方法中,较小的数据集被广播到集群内的所有执行器。 然后,每个执行器对广播的数据集进行哈希处理,并与较大的数据集执行连接操作。
我尝试过以下方法:
df_joined = df_fact.join(F.broadcast(df_dim1), "id", "left")
.join(F.broadcast(df_dim2), "id", "left")
结果:
id value dim1_value dim2_value
1 100 Dim1_Value1 Dim2_Value1
2 200 Dim1_Value2 null
3 300 null Dim2_Value2
4 400 null null