我面临着如何在不使用收集方法的情况下有效过滤 Spark DataFrame 的挑战,这可能会导致大型数据集上的性能问题。具体来说,我需要从 DataFrame 中过滤行,其中从另一个 DataFrame 获取的列表中不存在列值。
我有两个 DataFrame,df1 和 df2,我想根据其中一列中的值过滤 df1,假设“gold”,它不应该出现在 df2 的“silver”或“bronze”列中。
我尝试过的方法涉及使用collect,如下所示:
import org.apache.spark.sql.functions._
// df1 and df2 are assumed to be already defined
val filteredDF = df1.filter(not(col("gold").isin(df2.select("silver").union(df2.select("bronze")).collect.map(_.getString(0)): _*)))
我能够通过 Spark SQL 解决上述问题:
val result = sc.sql(
"""
|select gold as player_name, count(1) as no_of_player
|from olympic_table
|where gold not in (select silver from olympic_table union all select bronze from olympic_table)
|group by 1
|""".stripMargin)
result.show()
但是,我正在寻找一种替代方法来实现此过滤操作,而无需求助于收集。有没有办法直接在 DataFrame API 中以列表或任何其他结构的形式从 DataFrame 获取值,以避免将数据带到驱动程序节点?
任何建议、见解或替代方法将不胜感激。谢谢!
方法 1:使用
ANTI JOIN
WITH
olympic AS (
(
SELECT silver AS gold
FROM olympic_table
)
UNION ALL
(
SELECT bronze AS gold
FROM olympic_table
)
),
gold_cte AS (
SELECT
l.gold AS player_name,
COUNT(1) AS no_of_player
FROM gold l ANTI
JOIN olympic r ON l.gold = r.gold
GROUP BY
l.gold
)
SELECT
player_name,
no_of_player
FROM gold_cte
方法 2:使用
HOF - Higher Order Functions
WITH
gold_cte AS (
SELECT ARRAY_EXCEPT (
COLLECT_LIST (gold),
FLATTEN (
COLLECT_SET (
ARRAY(
silver,
bronze
)
)
)
) as input
FROM olympic_table
)
SELECT INLINE (
ARRAY_DISTINCT (
TRANSFORM (
input, e -> NAMED_STRUCT (
'player_name', e, 'no_of_player', SIZE (FILTER (input, f -> f = = e))
)
)
)
)
FROM gold_cte