如何在 Spark 中使用基于日期的滑动窗口连接两个表?

问题描述 投票:0回答:1

我有两个 Spark 数据框。

其中一个是唯一 ID,其输入日期数据框称为entry_table

身份证 进入日期
A1 2022-02-14
A2 2022-02-14
A5 2022-02-15
B1 2022-02-15
C1 2022-02-16
B5 2022-02-17
A6 2022-02-17
B6 2022-02-17
C6 2022-02-18

另一个是他们的activity_table数据框

身份证 活动日期 活动
A1 2022-02-14 al23o
A1 2022-02-14 pod5
A2 2022-02-14 p32olq
A5 2022-02-15 s1df
B1 2022-02-15 wra21s
A1 2022-02-16 3asDlk
A5 2022-02-17 12ewerf
A6 2022-02-17 powp09
A2 2022-02-18 p32sdolq
A5 2022-02-18 s1ddf
A1 2022-02-18 wraa21s
C1 2022-02-18 13aslk
C5 2022-02-19 712werf
B6 2022-02-19 43pop09
A2 2022-02-19 2p32olq
C5 2022-02-19 8s1df
B1 2022-02-20 wraty21s
A6 2022-02-20 3astlk
B5 2022-02-20 1276werf
C6 2022-02-20 p45op09
df_new = spark.sql("""select  * from activity_table WHERE activity_date >= '2022-02-14' AND activity_date <  '2022-02-24' AND ID in (SELECT ID FROM entry_table)""")

我正在尝试加入两个 Spark 数据帧,以这样的方式随时间移动滑动窗口,如果我有 6 天的滑动窗口,则条目日期为 2022-02-14 的第 1 天用户 A1 将仅具有活动数据到 2022 年 2 月 20 日,user_table C6 中条目日期为 2022 年 2 月 18 日的最后一天用户将拥有截至 2022 年 2 月 23 日的活动数据

apache-spark-sql window date-range
1个回答
1
投票
//create dataset
val data = Seq(("A1","2022-02-14"), 
     ("A2","2022-02-14"), 
     ("A5","2022-02-15"), 
     ("B1","2022-02-15"), 
     ("C1","2022-02-16"), 
     ("B5","2022-02-17"), 
     ("A6","2022-02-17"), 
     ("B6","2022-02-17"), 
     ("C6","2022-02-18")).toDF("ID","entry_date")
val activity = Seq(("ID","activity_date","activity"),
("A1","2022-02-14","al23o"),
("A1","2022-02-14","pod5"),
("A2","2022-02-14","p32olq"),
("A5","2022-02-15","s1df"),
("B1","2022-02-15","wra21s"),
("A1","2022-02-16","3asDlk"),
("A5","2022-02-17","12ewerf"),
("A6","2022-02-17","powp09"),
("A2","2022-02-18","p32sdolq"),
("A5","2022-02-18","s1ddf"),
("A1","2022-02-18","wraa21s"),
("C1","2022-02-18","13aslk"),
("C5","2022-02-19","712werf"),
("B6","2022-02-19","43pop09"),
("A2","2022-02-19","2p32olq"),
("C5","2022-02-19","8s1df"),
("B1","2022-02-20","wraty21s"),
("A6","2022-02-20","3astlk"),
("B5","2022-02-20","1276werf"),
("C6","2022-02-20","p45op09")).toDF("ID","activity_date","activity")

val date_window = data
 .select( 
  col("ID"), 
  to_date(col("entry_date")).as("entry_Date") ) //make dates
 .select( 
  col("ID"),
  col("entry_Date"),
  explode ( 
   expr("sequence(entry_Date,date_add(entry_Date, 6), interval 1 day)")).as("activity_window") 
  ) //use sequence to generate dates, 
  // if you need previous six use date_sub
  // "sequence(date_sub(entry_Date, 6),entry_Date), interval 1 day)"
  //explode the sequence to generate rows from array

date_window
 .join( 
  activity, 
  date_window("activity_window") === activity("activity_date") )// join on window column
 .groupBy(
  date_window("entry_Date"),
  date_window("ID"))
 .agg( 
  arrays_zip( //zip the activity information to make one array with both data points
   collect_list("activity").as("activity"),
   collect_list("activity_date")
    .as("activity_date"))
  .as("activities_array") )
 .select( 
  col("*"),
  explode( col("activities_array")).as("activities") ) // convert array into rows
 .select( 
  col("*"),
  col("activities.*")).show() //use '.*' to pull array into columns (rename as needed)
+----------+---+--------------------+--------------------+--------+----------+
|entry_Date| ID|    activities_array|          activities|       0|         1|
+----------+---+--------------------+--------------------+--------+----------+
|2022-02-17| B5|[[12ewerf, 2022-0...|[12ewerf, 2022-02...| 12ewerf|2022-02-17|
|2022-02-17| B5|[[12ewerf, 2022-0...|[powp09, 2022-02-17]|  powp09|2022-02-17|
|2022-02-17| B5|[[12ewerf, 2022-0...|[p32sdolq, 2022-0...|p32sdolq|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...| [s1ddf, 2022-02-18]|   s1ddf|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...|[wraa21s, 2022-02...| wraa21s|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...|[13aslk, 2022-02-18]|  13aslk|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...|[712werf, 2022-02...| 712werf|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...|[43pop09, 2022-02...| 43pop09|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...|[2p32olq, 2022-02...| 2p32olq|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...| [8s1df, 2022-02-19]|   8s1df|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...|[wraty21s, 2022-0...|wraty21s|2022-02-20|
|2022-02-17| B5|[[12ewerf, 2022-0...|[3astlk, 2022-02-20]|  3astlk|2022-02-20|
|2022-02-17| B5|[[12ewerf, 2022-0...|[1276werf, 2022-0...|1276werf|2022-02-20|
|2022-02-17| B5|[[12ewerf, 2022-0...|[p45op09, 2022-02...| p45op09|2022-02-20|
|2022-02-15| B1|[[s1df, 2022-02-1...|  [s1df, 2022-02-15]|    s1df|2022-02-15|
|2022-02-15| B1|[[s1df, 2022-02-1...|[wra21s, 2022-02-15]|  wra21s|2022-02-15|
|2022-02-15| B1|[[s1df, 2022-02-1...|[3asDlk, 2022-02-16]|  3asDlk|2022-02-16|
|2022-02-15| B1|[[s1df, 2022-02-1...|[12ewerf, 2022-02...| 12ewerf|2022-02-17|
|2022-02-15| B1|[[s1df, 2022-02-1...|[powp09, 2022-02-17]|  powp09|2022-02-17|
|2022-02-15| B1|[[s1df, 2022-02-1...|[p32sdolq, 2022-0...|p32sdolq|2022-02-18|
+----------+---+--------------------+--------------------+--------+----------+
© www.soinside.com 2019 - 2024. All rights reserved.