Spark DataFrame 中由于在特定分区上过滤时过滤器表达式过多而出现 StackOverflowError

问题描述 投票:0回答:1

我正在开发一个 Spark (Scala) 应用程序,我需要过滤掉特定分区(嵌套):

region_name/audit_submission_date_hr_min
region_name
是顶层分区。
audit_submission_date_hr_min
是每 15 分钟创建的嵌套分区。因此,我在大型 DataFrame 上应用了一系列过滤器表达式(
DataFrame
上大约有 3000 个过滤器)。

目的是只处理尚未处理的分区!

但是,我在执行作业时遇到了 java.lang.StackOverflowError 。

一级分区:

enter image description here

二级分区:

enter image description here

代码示例-

val df = inputDf.filter(((region_name = emea) AND (audit_submission_date_hr_min = 2024-10-15-00-54-00)) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-23-14-00)))
like ((region_name = emea) AND (audit_submission_date_hr_min = 2024-10-15-00-54-00)) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-23-14-00)) 
        OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-04-29-00))) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-17-14-00)))
        OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-13-17-14-00))) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-14-15-44-00)))
        .
        . its 3000+ filter will be applied on 1 dataframe.
        .
        OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-13-17-14-00))) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-14-15-44-00)))

错误

2024-10-16 20:25:28,899 ERROR ApplicationMaster [Driver]: User class threw exception: java.lang.StackOverflowError 
java.lang.StackOverflowError
    at scala.collection.generic.GenericTraversableTemplate.genericBuilder(GenericTraversableTemplate.scala:72)
    ...
    at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:126)
    ...

我怀疑问题的出现是由于我在 DataFrame 上应用了大量的过滤表达式。使用较少的过滤器,作业可以正常运行,但是当我增加数量时,就会发生堆栈溢出。

问题 1. - 为什么会出现 StackOverflowError ?

问题 2. - Spark 中是否有更有效的方法来处理大量过滤表达式?我可以使用什么策略来避免这种情况 堆栈溢出错误?

问题 3.-在构建执行计划时,一次可以在 1 个数据帧上应用的过滤器数量有什么限制?

apache-spark databricks query-optimization
1个回答
0
投票

问题 1. - 为什么会出现 StackOverflowError ?

因为 Spark 创建的计划(使用其名为 Catalyst 的优化器)是一棵树。例如,如果您采取

df = df.filter(
    (col("region_name") == "A") & (col("timestamp") == "2024-10-08-23-14-00")
    | (col("region_name") == "B") & (col("timestamp") == "2024-10-15-00-54-00")
    & (col("region_name") == "C") & (col("timestamp") == "2024-10-08-23-02-00")
    | (col("region_name") == "D") & (col("timestamp") == "2024-10-15-00-54-01")

然后做

df.explain(True)

你会得到一棵这样的树

(
    (
        (
            (region_name#4 = A) AND
            (timestamp#5 = 2024-10-08-23-14-00)
        ) OR 
        (
            (
                (
                    (region_name#4 = B) AND
                    (timestamp#5 = 2024-10-15-00-54-00)
                ) AND 
                (region_name#4 = C)
            ) AND
            (timestamp#5 = 2024-10-08-23-02-00)
        )
    ) OR
    (
        (region_name#4 = D) AND
        (timestamp#5 = 2024-10-15-00-54-01)
    )
)

因此,如果像你所说的那样有 2000 个子句,很可能会导致堆栈溢出,因为树的深度会增加。

问题 2. - Spark 中是否有更有效的方法来处理大量过滤表达式?我可以使用什么策略来避免这个 StackOverflowError ?

如果您有很多条件需要申请,加入可能是一种选择。具体来说,您的表和(更小的)“条件表”之间的广播连接会广播到执行器节点,这会更有效。

它可能看起来像这样

data = [("emea", "2024-10-15-00-54-00", "some_data_1"),
        ("africa", "2024-10-08-23-14-00", "some_data_2"),
        ("africa", "2024-10-08-04-29-00", "some_data_3"),
        ("africa", "2024-10-13-17-14-00", "some_data_4"),
        ("apac", "2024-10-13-17-14-00", "some_data_5")]

columns = ["region_name", "timestamp", "extra_column"]

input_df = spark.createDataFrame(data, columns)

filter_data = [("emea", "2024-10-15-00-54-00"),
               ("africa", "2024-10-08-23-14-00"),
               ("africa", "2024-10-08-04-29-00")]

filter_columns = ["region_name", "timestamp"]

filter_df = spark.createDataFrame(filter_data, filter_columns)

final_filtered_df = input_df.join(
    broadcast(filter_df), 
    ["region_name", "timestamp"], 
    "inner"
)

仅当您的条件采用

(... & ... & ... & ...) | (... & ... & ... & ...) | (... & ... & ... & ...)
的特定格式时,这才有效。原始表中与条件表中任何列匹配的任何行都会被连接选择。

问题 3.-在构建执行计划时,一次可以在 1 个数据帧上应用的过滤器数量有什么限制?

我不知道有限制,但我相信这取决于您的执行环境。

© www.soinside.com 2019 - 2024. All rights reserved.