PySpark 生成缺失的日期并用以前的值填充数据

问题描述 投票:0回答:2

我需要帮助来填充此案例,用新行填充缺失值:

这只是一个例子,但我有很多行都有不同的

IDs

输入数据框

身份证 旗帜 日期
123 1 2021/01/01
123 0 2021 年 1 月 2 日
123 1 2021/01/03
123 0 2021/01/06
123 0 2021/01/08
777 0 2021/01/01
777 1 2021/01/03

所以我有一组有限的

dates
,我想直到每个
ID
的最后一个(在示例中,对于
ID = 123
:01/01/2021、01/02/2021、01/03/ 2021 年...直到 2021 年 1 月 8 日)。所以基本上我可以与日历进行交叉联接,但我不知道在交叉联接之后如何使用规则或过滤器填充缺失值。

预期输出:(以粗体显示生成的缺失值)

身份证 旗帜 日期
123 1 2021/01/01
123 0 2021 年 1 月 2 日
123 1 2021/01/03
123 1 2021/01/04
123 1 2021/01/05
123 0 2021/01/06
123 0 2021/01/07
123 0 2021/01/08
777 0 2021/01/01
777 0 2021/01/02
777 1 2021/01/03
dataframe apache-spark pyspark apache-spark-sql cross-join
2个回答
10
投票

您可以先按

id
分组来计算最大和最小
date
,然后使用
sequence
函数,生成从
min_date
max_date
的所有日期。最后,与原始数据帧连接并用每组
id
的最后一个非空填充空值。这是一个完整的工作示例:

您的输入数据框:

from pyspark.sql import Window
import pyspark.sql.functions as F

df = spark.createDataFrame([
    (123, 1, "01/01/2021"), (123, 0, "01/02/2021"),
    (123, 1, "01/03/2021"), (123, 0, "01/06/2021"),
    (123, 0, "01/08/2021"), (777, 0, "01/01/2021"),
    (777, 1, "01/03/2021")
], ["id", "flag", "date"])

id
分组并为每个
id
生成所有可能的日期:

all_dates_df = df.groupBy("id").agg(
    F.date_trunc("mm", F.max(F.to_date("date", "dd/MM/yyyy"))).alias("max_date"),
    F.date_trunc("mm", F.min(F.to_date("date", "dd/MM/yyyy"))).alias("min_date")
).select(
    "id",
    F.expr("sequence(min_date, max_date, interval 1 month)").alias("date")
).withColumn(
    "date", F.explode("date")
).withColumn(
    "date",
    F.date_format("date", "dd/MM/yyyy")
)

现在,与

df
进行左连接,并在由 last
 分区的窗口上使用 
id
 函数来填充空值:

w = Window.partitionBy("id").orderBy("date")

result = all_dates_df.join(df, ["id", "date"], "left").select(
    "id",
    "date",
    *[F.last(F.col(c), ignorenulls=True).over(w).alias(c)
      for c in df.columns if c not in ("id", "date")
     ]
)

result.show()
#+---+----------+----+
#| id|      date|flag|
#+---+----------+----+
#|123|01/01/2021|   1|
#|123|01/02/2021|   0|
#|123|01/03/2021|   1|
#|123|01/04/2021|   1|
#|123|01/05/2021|   1|
#|123|01/06/2021|   0|
#|123|01/07/2021|   0|
#|123|01/08/2021|   0|
#|777|01/01/2021|   0|
#|777|01/02/2021|   0|
#|777|01/03/2021|   1|
#+---+----------+----+

5
投票

您可以找到当前行和下一行中

DATE
值之间的日期范围,然后使用
sequence
生成所有中间日期并分解此数组以填充缺失日期的值。

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [(123, 1, "01/01/2021",),
        (123, 0, "01/02/2021",),
        (123, 1, "01/03/2021",),
        (123, 0, "01/06/2021",),
        (123, 0, "01/08/2021",),
        (777, 0, "01/01/2021",),
        (777, 1, "01/03/2021",), ]

df = spark.createDataFrame(data, ("ID", "FLAG", "DATE",)).withColumn("DATE", F.to_date(F.col("DATE"), "dd/MM/yyyy"))

window_spec = Window.partitionBy("ID").orderBy("DATE")

next_date = F.coalesce(F.lead("DATE", 1).over(window_spec), F.col("DATE") + F.expr("interval 1 month"))

end_date_range = next_date - F.expr("interval 1 month")


df.withColumn("Ranges", F.sequence(F.col("DATE"), end_date_range, F.expr("interval 1 month")))\
  .withColumn("DATE", F.explode("Ranges"))\
  .withColumn("DATE", F.date_format("date", "dd/MM/yyyy"))\
  .drop("Ranges").show(truncate=False)

输出

+---+----+----------+
|ID |FLAG|DATE      |
+---+----+----------+
|123|1   |01/01/2021|
|123|0   |01/02/2021|
|123|1   |01/03/2021|
|123|1   |01/04/2021|
|123|1   |01/05/2021|
|123|0   |01/06/2021|
|123|0   |01/07/2021|
|123|0   |01/08/2021|
|777|0   |01/01/2021|
|777|0   |01/02/2021|
|777|1   |01/03/2021|
+---+----+----------+
© www.soinside.com 2019 - 2024. All rights reserved.