我创建了倾斜数据来测试加盐方法,并尝试了三种不同的解决方案,但没有一个实现了预期的结果并显着改善了运行时间。您能指导我有效解决此问题的最佳方法吗?
import pyspark.sql.functions as F
df1 = spark.range(300_000_000).withColumn('value',F.when(F.rand() < 0.6,1).otherwise((F.rand() * 100).cast('int)).drip('id')
df2 = spark.range(200_000_000).withColumn('value',F.when(F.rand() < 0.2,4).otherwise((F.rand() * 100).cast('int)).drip('id')
final_df = df1.join(df2,on='value',how='inner')
finaldf.write.format('parquet).save(path)
流程1
启用 AQE 和其他设置,例如
skewjoin,enabled=true,coalescePartitions.enabled=True
和 shuffle.partition.=auto
运行了20多分钟,我手动取消了作业。
流程2
盐腌技术。禁用 AQE 并添加随机分区大小 = 1000
df1 = (df1.withColumn('salt_numbers',F.expr('sequence(0,1)'))
.withColum('salt',F.explode('salt_numbers'))
.drop('salt_numbers))
df2 = (df2.withColumn('salt_numbers',F.expr('sequence(0,3)'))
.withColum('salt',F.explode('salt_numbers'))
.drop('salt_numbers))
现在两个数据集具有相同的大小 (600_000_00) 并添加了连接列 - salt。 on=['值','盐'] 。 它运行了超过 30 分钟并手动取消了作业。
流程3
加盐技术:启用 AQE
df1 = (df1.withColumn('salt_numbers',F.expr('sequence(0,1)'))
.withColum('salt',F.explode('salt_numbers'))
.drop('salt_numbers))
df2 = (df2.withColumn('salt_numbers',F.expr('sequence(0,3)'))
.withColum('salt',F.explode('salt_numbers'))
.drop('salt_numbers))
现在两个数据集具有相同的大小 (600_000_00) 并添加了连接列 - salt。 on=['值','盐'] 。 也花了类似的 30 分钟并手动取消了作业。
注意:Databricks 集群大小为 32 GB、4 核和 8 个工作线程。 请分享您对我们如何高效地开展这项工作的想法。
验证以下 Spark 配置属性:
在以下情况下,分区被识别为倾斜: 其大小超过 skewedPartitionFactor 与分区大小中位数的乘积。 它的大小大于 skewedPartitionThresholdInBytes。