无法过滤掉PySpark中巨大数据集中的数据帧

问题描述 投票:0回答:1

我有一个巨大的 PySpark 数据框,其中包含 1.5B 行,包括列

fieldA
。我有一个包含 880 万个唯一
fieldA
值的列表,我想从 1.5B 行中过滤掉它们。但是,我认为由于数据量很大,我不断收到诸如
StackOverflowError
OutOfMemoryError
之类的错误。

我尝试将 8.8M 列表拆分为 20K 值的较小列表,并将 1.5B 数据帧拆分为每个 15M 行的较小数据帧。然后,对于 15M 行的每个数据帧,连续(循环)过滤掉不同的 20K

fieldA
值 (
temp_df = temp_df.filter(~col('fieldA').isin(fieldA_part_list))
),直到过滤掉所有 8.8M 值,然后将最终的
temp_df
写入 parquet 文件。对接下来的 15M 行数据帧重复此操作。然而,我认为这导致了数百个
.filter()
,当我尝试在第一个 15M 数据帧上写入镶木地板文件时,这可能就是给我
StackOverflowError
的原因。

然后我尝试从每个 15M 数据帧中过滤掉完整的 880 万个值。对于每个 15M 数据帧,我会将过滤结果写入 parquet 文件。然而,当我尝试写入 parquet 文件时,我在第一个 15M 数据帧上得到了

OutOfMemoryError

如何以有效的方式从 1.5B 行数据帧中过滤掉与任何 8.8M

fieldA
值匹配的行?

python pandas pyspark out-of-memory
1个回答
0
投票

您可以采取一些措施来提高代码的内存效率。

  • 在开始时仅选择您需要的列:这将有助于减少

    temp_df
    DataFrame 的内存消耗。

  • 如果元素列表太大,请使用

    join
    代替
    isin
    这可能会在过滤时造成开销。我的建议是将元素列表转换为 DataFrame,并使用像
    leftanti
    这样的连接技术来获取
    temp_df
    中不存在于
    fieldA_part_list
    中的所有元素。

  • 尝试 DataFrame. exceptAll (pyspark.sql.DataFrame. exceptAll): 这也是一个很好的本机 pyspark 操作,您可以尝试它基本上返回

    temp_df
    中不在
    fieldA_part_list
    中的所有行(考虑到现在是一个数据框)。

以下是一些可以用作基准的示例:

from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

fieldA_part_list = ["someValue1", "someValue2", "someValue3"]  

# transform the fieldA_list as a dataframe
fieldA_part_df = spark.createDataFrame(fieldA_part_list, StringType()).toDF("value")

join_condition_expr = col("fieldA") == col("value")

# Perform leftanti to get only temp_df records
temp_df = temp_df.join(fieldA_part_df, on= join_condition_expr, how="leftanti")

# Alternatively using exceptAll - use one or other given memory pessure is on
another_result_df = temp_df.exceptAll(fieldA_part_df)

出于好奇,您是使用本地计算机还是 Databricks 集群来执行这些操作?

© www.soinside.com 2019 - 2024. All rights reserved.