我正在开发一个 Spark (Scala) 应用程序,我需要过滤掉特定分区(嵌套):
region_name/audit_submission_date_hr_min
。 region_name
是顶层分区。 audit_submission_date_hr_min
是每 15 分钟创建的嵌套分区。因此,我在大型 DataFrame 上应用了一系列过滤器表达式(DataFrame
上大约有 3000 个过滤器)。
目的是只处理尚未处理的分区!
但是,我在执行作业时遇到了 java.lang.StackOverflowError 。
一级分区:
二级分区:
代码示例-
val df = inputDf.filter(((region_name = emea) AND (audit_submission_date_hr_min = 2024-10-15-00-54-00)) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-23-14-00)))
like ((region_name = emea) AND (audit_submission_date_hr_min = 2024-10-15-00-54-00)) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-23-14-00))
OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-04-29-00))) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-08-17-14-00)))
OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-13-17-14-00))) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-14-15-44-00)))
.
. its 3000+ filter will be applied on 1 dataframe.
.
OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-13-17-14-00))) OR ((region_name = africa) AND (audit_submission_date_hr_min = 2024-10-14-15-44-00)))
错误
2024-10-16 20:25:28,899 ERROR ApplicationMaster [Driver]: User class threw exception: java.lang.StackOverflowError
java.lang.StackOverflowError
at scala.collection.generic.GenericTraversableTemplate.genericBuilder(GenericTraversableTemplate.scala:72)
...
at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:126)
...
我怀疑问题的出现是由于我在 DataFrame 上应用了大量的过滤表达式。使用较少的过滤器,作业可以正常运行,但是当我增加数量时,就会发生堆栈溢出。
问题 1. - 为什么会出现 StackOverflowError ?
问题 2. - Spark 中是否有更有效的方法来处理大量过滤表达式?我可以使用什么策略来避免这种情况 堆栈溢出错误?
问题 3.-在构建执行计划时,一次可以在 1 个数据帧上应用的过滤器数量有什么限制?
问题 1. - 为什么会出现 StackOverflowError ?
因为 Spark 创建的计划(使用其名为 Catalyst 的优化器)是一棵树。例如,如果您采取
df = df.filter(
(col("region_name") == "A") & (col("timestamp") == "2024-10-08-23-14-00")
| (col("region_name") == "B") & (col("timestamp") == "2024-10-15-00-54-00")
& (col("region_name") == "C") & (col("timestamp") == "2024-10-08-23-02-00")
| (col("region_name") == "D") & (col("timestamp") == "2024-10-15-00-54-01")
然后做
df.explain(True)
你会得到一棵这样的树
(
(
(
(region_name#4 = A) AND
(timestamp#5 = 2024-10-08-23-14-00)
) OR
(
(
(
(region_name#4 = B) AND
(timestamp#5 = 2024-10-15-00-54-00)
) AND
(region_name#4 = C)
) AND
(timestamp#5 = 2024-10-08-23-02-00)
)
) OR
(
(region_name#4 = D) AND
(timestamp#5 = 2024-10-15-00-54-01)
)
)
因此,如果像你所说的那样有 2000 个子句,很可能会导致堆栈溢出,因为树的深度会增加。
问题 2. - Spark 中是否有更有效的方法来处理大量过滤表达式?我可以使用什么策略来避免这个 StackOverflowError ?
如果您有很多条件需要申请,加入可能是一种选择。具体来说,您的表和(更小的)“条件表”之间的广播连接会广播到执行器节点,这会更有效。
它可能看起来像这样
data = [("emea", "2024-10-15-00-54-00", "some_data_1"),
("africa", "2024-10-08-23-14-00", "some_data_2"),
("africa", "2024-10-08-04-29-00", "some_data_3"),
("africa", "2024-10-13-17-14-00", "some_data_4"),
("apac", "2024-10-13-17-14-00", "some_data_5")]
columns = ["region_name", "timestamp", "extra_column"]
input_df = spark.createDataFrame(data, columns)
filter_data = [("emea", "2024-10-15-00-54-00"),
("africa", "2024-10-08-23-14-00"),
("africa", "2024-10-08-04-29-00")]
filter_columns = ["region_name", "timestamp"]
filter_df = spark.createDataFrame(filter_data, filter_columns)
final_filtered_df = input_df.join(
broadcast(filter_df),
["region_name", "timestamp"],
"inner"
)
仅当您的条件采用
(... & ... & ... & ...) | (... & ... & ... & ...) | (... & ... & ... & ...)
的特定格式时,这才有效。原始表中与条件表中任何列匹配的任何行都会被连接选择。
问题 3.-在构建执行计划时,一次可以在 1 个数据帧上应用的过滤器数量有什么限制?
我不知道有限制,但我相信这取决于您的执行环境。