我有一个大型 PySpark 数据框,包含 2.5 亿行,只有 2 列。我正在运行 here 找到的 minHash 代码。我尝试通过
adj_sdf.write.mode("append").parquet("/output/folder/")
将生成的数据帧写入镶木地板文件。但是,我不断收到错误pod ephemeral local storage usage exceeds the total limit of containers
。我无法增加集群资源,所以我想知道是否有方法可以优化 PySpark 代码。
到目前为止,我已经完成了以下工作:
sdf = sdf.repartition(200)
hash_sdf.alias('a').join(...)
):
filtered_sdf = hash_sdf.filter(f.size(f.col('nodeSet')) > threshold)
,其中threshold = int(0.2 * n_draws)
spark.conf.set("spark.sql.shuffle.partitions", "200")
我还能做什么来将数据帧写入镶木地板文件而不遇到资源问题?
你可以尝试一些事情
使用coalesece(num_of_partition)代替重新分区,因为重新分区减少了shuffle操作,可以计算num_of_partition = Total_data_size// target_size_of_partition。
保留一些数据而不是缓存
sdf.persist(StorageLevel.DISK_ONLY
)
可以尝试在洗牌期间将数据溢出到磁盘。
spark.conf.set("spark.shuffle.spill", "true")
spark.conf.set("spark.shuffle.spill.disk", "true")