我需要帮助来填充此案例,用新行填充缺失值:
这只是一个例子,但我有很多行都有不同的
IDs
。
输入数据框:
身份证 | 旗帜 | 日期 |
---|---|---|
123 | 1 | 2021/01/01 |
123 | 0 | 2021 年 1 月 2 日 |
123 | 1 | 2021/01/03 |
123 | 0 | 2021/01/06 |
123 | 0 | 2021/01/08 |
777 | 0 | 2021/01/01 |
777 | 1 | 2021/01/03 |
所以我有一组有限的
dates
,我想直到每个ID
的最后一个(在示例中,对于ID = 123
:01/01/2021、01/02/2021、01/03/ 2021 年...直到 2021 年 1 月 8 日)。所以基本上我可以与日历进行交叉联接,但我不知道在交叉联接之后如何使用规则或过滤器填充缺失值。
预期输出:(以粗体显示生成的缺失值)
身份证 | 旗帜 | 日期 |
---|---|---|
123 | 1 | 2021/01/01 |
123 | 0 | 2021 年 1 月 2 日 |
123 | 1 | 2021/01/03 |
123 | 1 | 2021/01/04 |
123 | 1 | 2021/01/05 |
123 | 0 | 2021/01/06 |
123 | 0 | 2021/01/07 |
123 | 0 | 2021/01/08 |
777 | 0 | 2021/01/01 |
777 | 0 | 2021/01/02 |
777 | 1 | 2021/01/03 |
您可以先按
id
分组来计算最大和最小date
,然后使用sequence
函数,生成从min_date
到max_date
的所有日期。最后,与原始数据帧连接并用每组 id
的最后一个非空填充空值。这是一个完整的工作示例:
您的输入数据框:
from pyspark.sql import Window
import pyspark.sql.functions as F
df = spark.createDataFrame([
(123, 1, "01/01/2021"), (123, 0, "01/02/2021"),
(123, 1, "01/03/2021"), (123, 0, "01/06/2021"),
(123, 0, "01/08/2021"), (777, 0, "01/01/2021"),
(777, 1, "01/03/2021")
], ["id", "flag", "date"])
按
id
分组并为每个id
生成所有可能的日期:
all_dates_df = df.groupBy("id").agg(
F.date_trunc("mm", F.max(F.to_date("date", "dd/MM/yyyy"))).alias("max_date"),
F.date_trunc("mm", F.min(F.to_date("date", "dd/MM/yyyy"))).alias("min_date")
).select(
"id",
F.expr("sequence(min_date, max_date, interval 1 month)").alias("date")
).withColumn(
"date", F.explode("date")
).withColumn(
"date",
F.date_format("date", "dd/MM/yyyy")
)
现在,与
df
进行左连接,并在由 last
分区的窗口上使用
id
函数来填充空值:
w = Window.partitionBy("id").orderBy("date")
result = all_dates_df.join(df, ["id", "date"], "left").select(
"id",
"date",
*[F.last(F.col(c), ignorenulls=True).over(w).alias(c)
for c in df.columns if c not in ("id", "date")
]
)
result.show()
#+---+----------+----+
#| id| date|flag|
#+---+----------+----+
#|123|01/01/2021| 1|
#|123|01/02/2021| 0|
#|123|01/03/2021| 1|
#|123|01/04/2021| 1|
#|123|01/05/2021| 1|
#|123|01/06/2021| 0|
#|123|01/07/2021| 0|
#|123|01/08/2021| 0|
#|777|01/01/2021| 0|
#|777|01/02/2021| 0|
#|777|01/03/2021| 1|
#+---+----------+----+
您可以找到当前行和下一行中
DATE
值之间的日期范围,然后使用 sequence
生成所有中间日期并分解此数组以填充缺失日期的值。
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(123, 1, "01/01/2021",),
(123, 0, "01/02/2021",),
(123, 1, "01/03/2021",),
(123, 0, "01/06/2021",),
(123, 0, "01/08/2021",),
(777, 0, "01/01/2021",),
(777, 1, "01/03/2021",), ]
df = spark.createDataFrame(data, ("ID", "FLAG", "DATE",)).withColumn("DATE", F.to_date(F.col("DATE"), "dd/MM/yyyy"))
window_spec = Window.partitionBy("ID").orderBy("DATE")
next_date = F.coalesce(F.lead("DATE", 1).over(window_spec), F.col("DATE") + F.expr("interval 1 month"))
end_date_range = next_date - F.expr("interval 1 month")
df.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 month")))\
.withColumn("DATE", F.explode("Ranges"))\
.withColumn("DATE", F.date_format("date", "dd/MM/yyyy"))\
.drop("Ranges").show(truncate=False)
+---+----+----------+
|ID |FLAG|DATE |
+---+----+----------+
|123|1 |01/01/2021|
|123|0 |01/02/2021|
|123|1 |01/03/2021|
|123|1 |01/04/2021|
|123|1 |01/05/2021|
|123|0 |01/06/2021|
|123|0 |01/07/2021|
|123|0 |01/08/2021|
|777|0 |01/01/2021|
|777|0 |01/02/2021|
|777|1 |01/03/2021|
+---+----+----------+