有什么技术可以解决databricks中的倾斜数据吗?

问题描述 投票:0回答:1

我创建了倾斜数据来测试加盐方法,并尝试了三种不同的解决方案,但没有一个实现了预期的结果并显着改善了运行时间。您能指导我有效解决此问题的最佳方法吗?

import pyspark.sql.functions as F
df1 = spark.range(300_000_000).withColumn('value',F.when(F.rand() < 0.6,1).otherwise((F.rand() * 100).cast('int)).drip('id')
df2 = spark.range(200_000_000).withColumn('value',F.when(F.rand() < 0.2,4).otherwise((F.rand() * 100).cast('int)).drip('id')
final_df = df1.join(df2,on='value',how='inner')
finaldf.write.format('parquet).save(path)

流程1

启用 AQE 和其他设置,例如

skewjoin,enabled=true,coalescePartitions.enabled=True
shuffle.partition.=auto

运行了20多分钟,我手动取消了作业。

流程2

盐腌技术。禁用 AQE 并添加随机分区大小 = 1000

df1 = (df1.withColumn('salt_numbers',F.expr('sequence(0,1)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))
df2 = (df2.withColumn('salt_numbers',F.expr('sequence(0,3)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))

现在两个数据集具有相同的大小 (600_000_00) 并添加了连接列 - salt。 on=['值','盐'] 。 它运行了超过 30 分钟并手动取消了作业。

流程3

加盐技术:启用 AQE

df1 = (df1.withColumn('salt_numbers',F.expr('sequence(0,1)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))
df2 = (df2.withColumn('salt_numbers',F.expr('sequence(0,3)'))
   .withColum('salt',F.explode('salt_numbers'))
   .drop('salt_numbers))

现在两个数据集具有相同的大小 (600_000_00) 并添加了连接列 - salt。 on=['值','盐'] 。 也花了类似的 30 分钟并手动取消了作业。

注意:Databricks 集群大小为 32 GB、4 核和 8 个工作线程。 请分享您对我们如何高效地开展这项工作的想法。

sql pyspark hive databricks aws-databricks
1个回答
0
投票

验证以下 Spark 配置属性:

  1. spark.sql.adaptive.skewJoin.enabled(默认值:true): 启用或禁用自适应倾斜连接处理。
  2. spark.sql.adaptive.skewJoin.skewedPartitionFactor(默认值:5): 用于确定分区是否倾斜的乘数。 如果分区的大小超过分区大小中值的 5 倍,则该分区被视为倾斜。
  3. spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes(默认值:256MB): 被视为倾斜的分区的最小大小阈值。

在以下情况下,分区被识别为倾斜: 其大小超过 skewedPartitionFactor 与分区大小中位数的乘积。 它的大小大于 skewedPartitionThresholdInBytes。

© www.soinside.com 2019 - 2024. All rights reserved.