这更多的是对我的理解的澄清。我收到错误“无法广播大于 8GB 的表错误”
这是我的伪代码:
val riskDF = broadcast(someDF) // i want riskDF to be broadcasted as part of join, as it is small < 1GB
val processDF1 = ProcessAndJoin(riskDF) // read from another source and join with riskDF
val processDF2 = ProcessAndJoin(riskDF) // read from another source and join with riskDF
val processDF3 = ProcessAndJoin(riskDF) // read from another source and join with riskDF
// processDF1、processDF2 和 processDF3 的并集。然后将输出写入存储桶。
我有spark.sql.adaptive.enabled=true。
理解与问题
我以为riskDF只会广播一次。但是基于DAG,它是多次读取和广播的。
我认为根据此代码,为“广播”“点击”设置了固定的 8GB 限制参考 我不确定是否为一次广播连接命中设置了 8GB 限制,还是为整个管道的所有广播连接的累积设置了 8GB 限制
我不认为这个特定错误是关于spark.sql.autoBroadcastJoinThreshold(在我的例子中,我没有设置它,它保留为默认值)。根据文档,“广播”提示独立于“spark.sql.autoBroadcastJoinThreshold”。
我的理解对吗?
我正在尝试设置spark.sql.adaptive.enabled=false并重新运行用例。很快就会更新到这里。
我了解错误的原因。
我们正在 dataproc serverless 上进行实验,它设置了 Spark.sql.autoBroadcastJoinThreshold=16g。因此,连接会导致广播。但在 Spark 代码中,here,正在检查 8GB 的限制。 这次检查失败,因为广播的数据明显超过了8GB。
理想情况下,spark 应该有一个默认的 max.spark.sql.autoBroadcastJoinThreshold (=8gb)。任何更高的值都应该重置为 8GB。