基于另一个数据帧的单列派生具有不常见行的数据帧

问题描述 投票:0回答:1

我遇到了一个问题,我必须根据另一个数据帧的列从数据帧中获取不常见的行。 例子是 第一个数据帧,即 df1

_id 名字
12 abc
56 定义
90 jkl

参考数据框,即 df2 :-

_id 用户名 事件名称
12 abc 一些
34 xyz
56 定义
78
90 jkl 正面

第三个数据框预计有

_id 用户名 事件名称
34 xyz
78

为了实现这一目标,我尝试了两种方法:-

方法一 从 df2 获取 id 列表并使用 isin 来过滤列,如下

val idsList = df1.select("_id").dropDuplicates().map(_.getInt(0)).collect.toList
val dffDf = df2.filter(not($"_id".isin(idsList: _*)))

这种方法的问题是 collect 函数太重,并且需要大量时间,因为所有计算都发生在驱动程序级别。

方法2 使用左反连接

 df2.join(df1, df2("_id") ===  df1("_id"),"leftanti")

但这给了我以下问题

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Max iterations (100) reached for batch Resolution, please set 'spark.sql.analyzer.maxIterations' to a larger value

问题在https://issues.apache.org/jira/browse/SPARK-37222中提到,没有任何解决方案。

还有其他建议可以实现这一目标吗? 任何指针都会很棒。我需要一个有效且不繁重或不耗时的解决方案。 使用spark 3.3.0版本。

dataframe scala apache-spark apache-spark-sql spark-streaming
1个回答
0
投票

以下是解决此问题的多种方法中的两种 -

使用 left_anti

    import spark.implicits._

val input1 = Seq(
  (12, "abc"),
  (56, "def"),
  (90, "jkl")
).toDF("_id", "name")

val input2 = Seq(
  (12, "abc", "some"),
  (34, "xyz", "left"),
  (56, "def", "right"),
  (78, "ghi", "middle"),
  (90, "jkl", "front")
).toDF("_id", "username", "event_name")
println("spark.version : " + spark.version)

val finalDf = input2.join(input1, input2("_id") === input1("_id"), "left_anti")

println("result : ")
finalDf.show

/*
spark.version : 3.3.0
result : 
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34|     xyz|      left|
| 78|     ghi|    middle|
+---+--------+----------+
 */

使用 left_outer

    import spark.implicits._
val input1 = Seq(
  (12, "abc"),
  (56, "def"),
  (90, "jkl")
).toDF("_id", "name")

val input2 = Seq(
  (12, "abc", "some"),
  (34, "xyz", "left"),
  (56, "def", "right"),
  (78, "ghi", "middle"),
  (90, "jkl", "front")
).toDF("_id", "username", "event_name")

println("spark.version : " + spark.version)

val finalDf = input2.as("input2").join(input1.as("input1"), input2("_id") === input1("_id"), "left_outer").where(input1("_id").isNull)
println("result : ")
finalDf.select("input2.*").show


/*
spark.version : 3.3.0
result :
+---+--------+----------+
|_id|username|event_name|
+---+--------+----------+
| 34|     xyz|      left|
| 78|     ghi|    middle|
+---+--------+----------+

 */
© www.soinside.com 2019 - 2024. All rights reserved.