Scala如何匹配两个dfs,如果匹配,然后在第一个df更新密钥

问题描述 投票:0回答:1

我有两个数据帧的数据:

selectedPersonDF:

ID    key
1     
2     
3
4
5

selectedDetailsDF:

first  second third  key
--------------------------
1       9       9    777
9       8       8    878
8       10      10   765
10      12      19   909
11      2       20   708

代码:

val personDF = spark.read.option("header", "true").option("inferSchema", "false").csv("person.csv")
val detailsDF = spark.read.option("header", "true").option("inferSchema", "false").csv("details.csv")

val selectedPersonDF=personDF.select((col("ID"),col("key"))).show() 
val selectedDetailsDF=detailsDF.select(col("first"),col("second"),col("third"),col("key")).show()

我必须将selectedPersonDF id列与selectedDetailsDF匹配所有列(First,Second,Third)如果任何列数据与person id匹配,那么我们必须从selectedDetailsDF获取键值并且必须在selectedPersonDF键列中更新。

预期输出(在selectedPersonDF中):

ID    key
1     777
2     708     
3
4
5

并且在从people'df中删除第一行之后,因为它与detailsdf匹配,剩余数据应该存储在另一个df中。

scala apache-spark spark-dataframe
1个回答
1
投票

你可以使用join并使用||条件检查和left join作为

val finalDF = selectedPersonDF.join(selectedDetailsDF.withColumnRenamed("key", "key2"), $"ID" === $"first" || $"ID" === $"second" || $"ID" === $"third", "left")
  .select($"ID", $"key2".as("key"))
  .show(false)

所以finalDF应该给你

+---+----+
|ID |key |
+---+----+
|1  |777 |
|2  |708 |
|3  |null|
|4  |null|
|5  |null|
+---+----+

我们可以在上面的数据框(.na.fill("")列必须是key)上调用StringType()来获取

+---+---+
|ID |key|
+---+---+
|1  |777|
|2  |708|
|3  |   |
|4  |   |
|5  |   |
+---+---+

之后,您可以使用filter将数据帧分离为匹配和非匹配,使用key列,其值为null,并且分别为null

val notMatchingDF = finalDF.filter($"key" === "")
val matchingDF = finalDF.except(notMatchingDF)

如果除了键列以外,selectedDetailsDF的列名称是未知的,则更新

如果第二个数据框的列名称未知,那么您将必须形成未知列的array列作为

val columnsToCheck = selectedDetailsDF.columns.toSet - "key" toList

import org.apache.spark.sql.functions._
val tempSelectedDetailsDF = selectedDetailsDF.select(array(columnsToCheck.map(col): _*).as("array"), col("key").as("key2"))

现在tempSelectedDetailsDF数据框有两列:所有未知列的组合列作为array列,键列重命名为key2

之后,您需要一个udf函数来检查加入时的状况

val arrayContains = udf((array: collection.mutable.WrappedArray[String], value: String) => array.contains(value))

然后你使用对定义的join函数的调用来udf数据帧

val finalDF = selectedPersonDF.join(tempSelectedDetailsDF, arrayContains($"array", $"ID"), "left")
  .select($"ID", $"key2".as("key"))
  .na.fill("")

上面已经定义了其余的过程。

我希望答案是有帮助的,可以理解的。

© www.soinside.com 2019 - 2024. All rights reserved.