我遇到了一个问题,我必须根据另一个数据帧的列从数据帧中获取不常见的行。 例子是 第一个数据帧,即 df1
_id | 名字 |
---|---|
12 | abc |
56 | 定义 |
90 | jkl |
参考数据框,即 df2 :-
_id | 用户名 | 事件名称 |
---|---|---|
12 | abc | 一些 |
34 | xyz | 左 |
56 | 定义 | 对 |
78 | 吉 | 中 |
90 | jkl | 正面 |
第三个数据框预计有
_id | 用户名 | 事件名称 |
---|---|---|
34 | xyz | 左 |
78 | 吉 | 中 |
为了实现这一目标,我尝试了两种方法:-
方法一 从 df2 获取 id 列表并使用 isin 来过滤列,如下
val idsList = df1.select("_id").dropDuplicates().map(_.getInt(0)).collect.toList
val dffDf = df2.filter(not($"_id".isin(idsList: _*)))
这种方法的问题是 collect 函数太重,并且需要大量时间,因为所有计算都发生在驱动程序级别。
方法2 使用左反连接
df2.join(df1, df2("_id") === df1("_id"),"leftanti")
但这给了我以下问题
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (100) reached for batch Resolution, please set 'spark.sql.analyzer.maxIterations' to a larger value
问题在https://issues.apache.org/jira/browse/SPARK-37222中提到,没有任何解决方案。
还有其他建议可以实现这一目标吗? 任何指针都会很棒。我需要一个有效且不繁重或不耗时的解决方案。 使用spark 3.3.0版本。
以下是解决此问题的多种方法中的两种 -
使用 left_anti
import spark.implicits._
val input1 = Seq(
(12, "abc"),
(56, "def"),
(90, "jkl")
).toDF("_id", "name")
val input2 = Seq(
(12, "abc", "some"),
(34, "xyz", "left"),
(56, "def", "right"),
(78, "ghi", "middle"),
(90, "jkl", "front")
).toDF("_id", "username", "event_name")
println("spark.version : " + spark.version)
val finalDf = input2.join(input1, input2("_id") === input1("_id"), "left_anti")
println("result : ")
finalDf.show
/*
spark.version : 3.3.0
result :
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34| xyz| left|
| 78| ghi| middle|
+---+--------+----------+
*/
使用 left_outer
import spark.implicits._
val input1 = Seq(
(12, "abc"),
(56, "def"),
(90, "jkl")
).toDF("_id", "name")
val input2 = Seq(
(12, "abc", "some"),
(34, "xyz", "left"),
(56, "def", "right"),
(78, "ghi", "middle"),
(90, "jkl", "front")
).toDF("_id", "username", "event_name")
println("spark.version : " + spark.version)
val finalDf = input2.as("input2").join(input1.as("input1"), input2("_id") === input1("_id"), "left_outer").where(input1("_id").isNull)
println("result : ")
finalDf.select("input2.*").show
/*
spark.version : 3.3.0
result :
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34| xyz| left|
| 78| ghi| middle|
+---+--------+----------+
*/