合并spark Dataframe中的行
我有以下数据
ID Name Passport Country License UpdatedtimeStamp
1 Ostrich 12345 - ABC 11-02-2018
1 - - - BCD 10-02-2018
1 Shah 12345 - - 12-02-2018
2 PJ - ANB a 10-02-2018
需要的输出是
ID Name Passport Country License UpdatedtimeStamp
1 Shah 12345 - ABC 12-02-2018
2 PJ - ANB a 10-02-2018
基本上,相同的ID
中的数据应该合并,并且最新更新而不是null
记录应该在输出中,如果所有值都是null
,则应保留null
..
请建议...另外,建议不使用SparkSQL Window
函数,因为我需要它非常快
如果你想完全留在sparkSQL中
val df= Seq((1,Some("ostrich"), Some(12345), None, Some("ABC")," 11-02-2018" ),
(1,None, None, None, Some("BCD"), "10-02-2018"),(1,Some("Shah"), Some(12345), None,None, "12-02-2018"),
(2,Some("PJ"), None, Some("ANB"), Some("a"), "10-02-2018")).toDF("ID","Name","Passport","Country","License","UpdatedtimeStamp")
val df1= df.withColumn("date", to_date($"UpdatedtimeStamp","MM-dd-yyyy" )).drop($"UpdatedtimeStamp")
val win = Window.partitionBy("ID").orderBy($"date".desc)
val df2=df1.select($"*", row_number.over(win).as("r")).orderBy($"ID", $"r").drop("r")
val exprs= df2.columns.drop(1).map(x=>collect_list(x).as(x+"_grp"))
val df3=df2.groupBy("ID").agg(exprs.head,exprs.tail: _*)
val exprs2= df3.columns.drop(1).map(x=> col(x)(0).as(x))
df3.select((Array(col(df2.columns(0)))++exprs2): _*).show
+---+----+--------+-------+-------+----------+
| ID|Name|Passport|Country|License| date|
+---+----+--------+-------+-------+----------+
| 1|Shah| 12345| null| ABC|2018-12-02|
| 2| PJ| null| ANB| a|2018-10-02|
+---+----+--------+-------+-------+----------+
您可以通过定义udf
函数并将收集的struct列传递给udf
函数来实现结果,以便对非零值的空值进行排序和填充。 (注释代码中提供了注释)
import org.apache.spark.sql.functions._
//udf function definition
def sortAndAggUdf = udf((structs: Seq[Row])=>{
//sorting the collected list by timestamp in descending order
val sortedStruct = structs.sortBy(str => str.getAs[Long]("UpdatedtimeStamp"))(Ordering[Long].reverse)
//selecting the first struct and casting to out case class
val first = out(sortedStruct(0).getAs[String]("Name"), sortedStruct(0).getAs[String]("Passport"), sortedStruct(0).getAs[String]("Country"), sortedStruct(0).getAs[String]("License"), sortedStruct(0).getAs[Long]("UpdatedtimeStamp"))
//aggregation for checking nulls and populating first not null value
sortedStruct
.foldLeft(first)((x, y) => {
out(
if(x.Name == null || x.Name.isEmpty) y.getAs[String]("Name") else x.Name,
if(x.Passport == null || x.Passport.isEmpty) y.getAs[String]("Passport") else x.Passport,
if(x.Country == null || x.Country.isEmpty) y.getAs[String]("Country") else x.Country,
if(x.License == null || x.License.isEmpty) y.getAs[String]("License") else x.License,
x.UpdatedtimeStamp)
})
})
//making the rest of the columns as one column and changing the UpdatedtimeStamp column to long for sorting in udf
df.select(col("ID"), struct(col("Name"), col("Passport"), col("Country"), col("License"), unix_timestamp(col("UpdatedtimeStamp"), "MM-dd-yyyy").as("UpdatedtimeStamp")).as("struct"))
//grouping and collecting the structs and passing to udf function for manipulation
.groupBy("ID").agg(sortAndAggUdf(collect_list("struct")).as("struct"))
//separating the aggregated columns to separate columns
.select(col("ID"), col("struct.*"))
//getting the date in correct format
.withColumn("UpdatedtimeStamp", date_format(col("UpdatedtimeStamp").cast("timestamp"), "MM-dd-yyyy"))
.show(false)
哪个应该给你
+---+----+--------+-------+-------+----------------+
|ID |Name|Passport|Country|License|UpdatedtimeStamp|
+---+----+--------+-------+-------+----------------+
|1 |Shah|12345 |null |ABC |12-02-2018 |
|2 |PJ |null |ANB |a |10-02-2018 |
+---+----+--------+-------+-------+----------------+
当然还需要一个案例类
case class out(Name: String, Passport: String, Country: String, License: String, UpdatedtimeStamp: Long)