在超出集群资源的大型数据帧上优化 PySpark 代码

问题描述 投票:0回答:1

我有一个大型 PySpark 数据框,包含 2.5 亿行,只有 2 列。我正在运行 here 找到的 minHash 代码。我尝试通过

adj_sdf.write.mode("append").parquet("/output/folder/")
将生成的数据帧写入镶木地板文件。但是,我不断收到错误
pod ephemeral local storage usage exceeds the total limit of containers
。我无法增加集群资源,所以我想知道是否有方法可以优化 PySpark 代码。

到目前为止,我已经完成了以下工作:

  1. 在运行 minHash 函数之前对数据帧进行分区:
    sdf = sdf.repartition(200)
  2. 在涉及两个连接的最后一步之前过滤掉不太可能共享许多哈希值的对 (
    hash_sdf.alias('a').join(...)
    ):
    filtered_sdf = hash_sdf.filter(f.size(f.col('nodeSet')) > threshold)
    ,其中
    threshold = int(0.2 * n_draws)
  3. 设置shuffle分区数量:
    spark.conf.set("spark.sql.shuffle.partitions", "200")

我还能做什么来将数据帧写入镶木地板文件而不遇到资源问题?

python dataframe pyspark optimization parquet
1个回答
0
投票

你可以尝试一些事情

  1. 使用coalesece(num_of_partition)代替重新分区,因为重新分区减少了shuffle操作,可以计算num_of_partition = Total_data_size// target_size_of_partition。

  2. 保留一些数据而不是缓存

    sdf.persist(StorageLevel.DISK_ONLY

  3. 可以尝试在洗牌期间将数据溢出到磁盘。

    spark.conf.set("spark.shuffle.spill", "true")

    spark.conf.set("spark.shuffle.spill.disk", "true")

© www.soinside.com 2019 - 2024. All rights reserved.