我有两个 Spark 数据框。
其中一个是唯一 ID,其输入日期数据框称为entry_table
身份证 | 进入日期 |
---|---|
A1 | 2022-02-14 |
A2 | 2022-02-14 |
A5 | 2022-02-15 |
B1 | 2022-02-15 |
C1 | 2022-02-16 |
B5 | 2022-02-17 |
A6 | 2022-02-17 |
B6 | 2022-02-17 |
C6 | 2022-02-18 |
另一个是他们的activity_table数据框
身份证 | 活动日期 | 活动 |
---|---|---|
A1 | 2022-02-14 | al23o |
A1 | 2022-02-14 | pod5 |
A2 | 2022-02-14 | p32olq |
A5 | 2022-02-15 | s1df |
B1 | 2022-02-15 | wra21s |
A1 | 2022-02-16 | 3asDlk |
A5 | 2022-02-17 | 12ewerf |
A6 | 2022-02-17 | powp09 |
A2 | 2022-02-18 | p32sdolq |
A5 | 2022-02-18 | s1ddf |
A1 | 2022-02-18 | wraa21s |
C1 | 2022-02-18 | 13aslk |
C5 | 2022-02-19 | 712werf |
B6 | 2022-02-19 | 43pop09 |
A2 | 2022-02-19 | 2p32olq |
C5 | 2022-02-19 | 8s1df |
B1 | 2022-02-20 | wraty21s |
A6 | 2022-02-20 | 3astlk |
B5 | 2022-02-20 | 1276werf |
C6 | 2022-02-20 | p45op09 |
df_new = spark.sql("""select * from activity_table WHERE activity_date >= '2022-02-14' AND activity_date < '2022-02-24' AND ID in (SELECT ID FROM entry_table)""")
我正在尝试加入两个 Spark 数据帧,以这样的方式随时间移动滑动窗口,如果我有 6 天的滑动窗口,则条目日期为 2022-02-14 的第 1 天用户 A1 将仅具有活动数据到 2022 年 2 月 20 日,user_table C6 中条目日期为 2022 年 2 月 18 日的最后一天用户将拥有截至 2022 年 2 月 23 日的活动数据
//create dataset
val data = Seq(("A1","2022-02-14"),
("A2","2022-02-14"),
("A5","2022-02-15"),
("B1","2022-02-15"),
("C1","2022-02-16"),
("B5","2022-02-17"),
("A6","2022-02-17"),
("B6","2022-02-17"),
("C6","2022-02-18")).toDF("ID","entry_date")
val activity = Seq(("ID","activity_date","activity"),
("A1","2022-02-14","al23o"),
("A1","2022-02-14","pod5"),
("A2","2022-02-14","p32olq"),
("A5","2022-02-15","s1df"),
("B1","2022-02-15","wra21s"),
("A1","2022-02-16","3asDlk"),
("A5","2022-02-17","12ewerf"),
("A6","2022-02-17","powp09"),
("A2","2022-02-18","p32sdolq"),
("A5","2022-02-18","s1ddf"),
("A1","2022-02-18","wraa21s"),
("C1","2022-02-18","13aslk"),
("C5","2022-02-19","712werf"),
("B6","2022-02-19","43pop09"),
("A2","2022-02-19","2p32olq"),
("C5","2022-02-19","8s1df"),
("B1","2022-02-20","wraty21s"),
("A6","2022-02-20","3astlk"),
("B5","2022-02-20","1276werf"),
("C6","2022-02-20","p45op09")).toDF("ID","activity_date","activity")
val date_window = data
.select(
col("ID"),
to_date(col("entry_date")).as("entry_Date") ) //make dates
.select(
col("ID"),
col("entry_Date"),
explode (
expr("sequence(entry_Date,date_add(entry_Date, 6), interval 1 day)")).as("activity_window")
) //use sequence to generate dates,
// if you need previous six use date_sub
// "sequence(date_sub(entry_Date, 6),entry_Date), interval 1 day)"
//explode the sequence to generate rows from array
date_window
.join(
activity,
date_window("activity_window") === activity("activity_date") )// join on window column
.groupBy(
date_window("entry_Date"),
date_window("ID"))
.agg(
arrays_zip( //zip the activity information to make one array with both data points
collect_list("activity").as("activity"),
collect_list("activity_date")
.as("activity_date"))
.as("activities_array") )
.select(
col("*"),
explode( col("activities_array")).as("activities") ) // convert array into rows
.select(
col("*"),
col("activities.*")).show() //use '.*' to pull array into columns (rename as needed)
+----------+---+--------------------+--------------------+--------+----------+
|entry_Date| ID| activities_array| activities| 0| 1|
+----------+---+--------------------+--------------------+--------+----------+
|2022-02-17| B5|[[12ewerf, 2022-0...|[12ewerf, 2022-02...| 12ewerf|2022-02-17|
|2022-02-17| B5|[[12ewerf, 2022-0...|[powp09, 2022-02-17]| powp09|2022-02-17|
|2022-02-17| B5|[[12ewerf, 2022-0...|[p32sdolq, 2022-0...|p32sdolq|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...| [s1ddf, 2022-02-18]| s1ddf|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...|[wraa21s, 2022-02...| wraa21s|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...|[13aslk, 2022-02-18]| 13aslk|2022-02-18|
|2022-02-17| B5|[[12ewerf, 2022-0...|[712werf, 2022-02...| 712werf|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...|[43pop09, 2022-02...| 43pop09|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...|[2p32olq, 2022-02...| 2p32olq|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...| [8s1df, 2022-02-19]| 8s1df|2022-02-19|
|2022-02-17| B5|[[12ewerf, 2022-0...|[wraty21s, 2022-0...|wraty21s|2022-02-20|
|2022-02-17| B5|[[12ewerf, 2022-0...|[3astlk, 2022-02-20]| 3astlk|2022-02-20|
|2022-02-17| B5|[[12ewerf, 2022-0...|[1276werf, 2022-0...|1276werf|2022-02-20|
|2022-02-17| B5|[[12ewerf, 2022-0...|[p45op09, 2022-02...| p45op09|2022-02-20|
|2022-02-15| B1|[[s1df, 2022-02-1...| [s1df, 2022-02-15]| s1df|2022-02-15|
|2022-02-15| B1|[[s1df, 2022-02-1...|[wra21s, 2022-02-15]| wra21s|2022-02-15|
|2022-02-15| B1|[[s1df, 2022-02-1...|[3asDlk, 2022-02-16]| 3asDlk|2022-02-16|
|2022-02-15| B1|[[s1df, 2022-02-1...|[12ewerf, 2022-02...| 12ewerf|2022-02-17|
|2022-02-15| B1|[[s1df, 2022-02-1...|[powp09, 2022-02-17]| powp09|2022-02-17|
|2022-02-15| B1|[[s1df, 2022-02-1...|[p32sdolq, 2022-0...|p32sdolq|2022-02-18|
+----------+---+--------------------+--------------------+--------+----------+