我有一个巨大的 PySpark 数据框,其中包含 1.5B 行,包括列
fieldA
。我有一个包含 880 万个唯一 fieldA
值的列表,我想从 1.5B 行中过滤掉它们。但是,我认为由于数据量很大,我不断收到诸如 StackOverflowError
或 OutOfMemoryError
之类的错误。
我尝试将 8.8M 列表拆分为 20K 值的较小列表,并将 1.5B 数据帧拆分为每个 15M 行的较小数据帧。然后,对于 15M 行的每个数据帧,连续(循环)过滤掉不同的 20K
fieldA
值 (temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list))
),直到过滤掉所有 8.8M 值,然后将最终的 temp_df
写入 parquet 文件。对接下来的 15M 行数据帧重复此操作。然而,我认为这导致了数百个 .filter()
,当我尝试在第一个 15M 数据帧上写入镶木地板文件时,这可能就是给我 StackOverflowError
的原因。
然后我尝试从每个 15M 数据帧中过滤掉完整的 880 万个值。对于每个 15M 数据帧,我会将过滤结果写入 parquet 文件。然而,当我尝试写入 parquet 文件时,我在第一个 15M 数据帧上得到了
OutOfMemoryError
。
如何以有效的方式从 1.5B 行数据帧中过滤掉与任何 8.8M
fieldA
值匹配的行?
您可以采取一些措施来提高代码的内存效率。
在开始时仅选择您需要的列:这将有助于减少
temp_df
DataFrame 的内存消耗。
如果元素列表太大,请使用
join
代替isin
:这可能会在过滤时造成开销。我的建议是将元素列表转换为 DataFrame,并使用像 leftanti
这样的连接技术来获取 temp_df
中不存在于 fieldA_part_list
中的所有元素。
尝试 DataFrame. exceptAll (pyspark.sql.DataFrame. exceptAll): 这也是一个很好的本机 pyspark 操作,您可以尝试它基本上返回
temp_df
中不在 fieldA_part_list
中的所有行(考虑到现在是一个数据框)。
以下是一些可以用作基准的示例:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
fieldA_part_list = ["someValue1", "someValue2", "someValue3"]
# transform the fieldA_list as a dataframe
fieldA_part_df = spark.createDataFrame(fieldA_part_list, StringType()).toDF("value")
join_condition_expr = col("fieldA") == col("value")
# Perform leftanti to get only temp_df records
temp_df = temp_df.join(fieldA_part_df, on= join_condition_expr, how="leftanti")
# Alternatively using exceptAll - use one or other given memory pessure is on
another_result_df = temp_df.exceptAll(fieldA_part_df)
出于好奇,您是使用本地计算机还是 Databricks 集群来执行这些操作?