Pyspark Databricks 优化技术

问题描述 投票:0回答:1

在我的代码片段下方。

spark.read.table('schema.table_1').createOrReplaceTempView('d1') # 400 million records
spark.read.table('schema.table_2').createOrReplaceTempView('d1') $ 300 million records

stmt = "select * from d1 inner join d2 on d1.id = d2.id"

(
    spark.sql(stmt).write('delta').mode('overwrite').saveAsTable('schema.table_3') # result count : 800 million records
    
)

集群大小为(32 GB内存,4核和6个工作线程)

有向无环图 enter image description here

来自 DAG 图片。

  1. 阶段-219 - 需要1小时
  2. 阶段-216和217被跳过

问题是

  1. 阶段-219正在引用写操作或者正在执行sql语句并尝试将结果写入目标表-
  2. 如何判断是连接操作花费了更多时间还是将结果写入目标表花费了更多时间。
  3. 基于DAG,stage-218需要40分钟。
apache-spark pyspark aws-databricks
1个回答
0
投票

首先检查 Spark 中 stage 218 的随机大小,并检查密钥分配中的倾斜。因为高shuffle和join操作需要更多时间。

阶段219:在写入任务上花费更多时间(低shuffle读/写但高I/O),因为写操作较慢。

最佳优化技术:

  1. 使用broadcast Joins避免shuffle,Repartitioning甚至数据分布。
  2. 对于 优化写入操作 使用 Delta/Parquet 这样的文件格式可以有效地将数据从源写入到目标,并增加输出分区的数量。
  3. 使用 parquet 文件进行压缩并减少存储和查询优化扫描大小。
  4. 根据上述数据大小使用分区。这将有助于提高优化速度。
  5. 优化表考虑小列文件以减少扫描开销。
© www.soinside.com 2019 - 2024. All rights reserved.