我有一个 pyspark df,其中我使用 windows + udf 函数的组合来计算历史业务日期的标准偏差。挑战是我的 df 在没有交易时缺少日期。如何计算包含这些缺失日期的 std dev,而不将它们作为附加行添加到我的 df 中以限制 df 大小超出内存。
样本表和电流输出
| ID | Date | Amount | Std_Dev|
|----|----------|--------|--------|
|1 |2021-03-24| 10000 | |
|1 |2021-03-26| 5000 | |
|1 |2021-03-29| 10000 |2886.751|
当前代码
from pyspark.sql.functions import udf,first,Window,withColumn
import numpy as np
from pyspark.sql.types import IntegerType
windowSpec = Window.partitionBy("ID").orderBy("date")
workdaysUDF = F.udf(lambda date1, date2: int(np.busday_count(date2, date1)) if (date1 is not None and date2 is not None) else None, IntegerType()) # UDF to calculate difference between business days#
df = df.withColumn("date_dif", workdaysUDF(F.col('Date'), F.first(F.col('Date')).over(windowSpec))) #column calculating business date diff#
windowval = lambda days: Window.partitionBy('id').orderBy('date_dif').rangeBetween(-days, 0)
df = df.withColumn("std_dev",F.stddev("amount").over(windowval(6))\
.drop("date_dif")
所需输出,其中 3 月 24 日至 29 日之间缺失的日期值将替换为 0。
| ID | Date | Amount | Std_Dev|
|----|----------|--------|--------|
|1 |2021-03-24| 10000 | |
|1 |2021-03-26| 5000 | |
|1 |2021-03-29| 10000 |4915.96 |
请注意,我仅显示单个日期的标准开发以进行说明,因为我使用滚动窗口函数,所以每一行都有值。 任何帮助将不胜感激。
PS:Pyspark 企业版本是 2.2.0,所以我没有灵活性更改版本。
谢谢, VSG
一种方法是使用 pyspark.sql.functions.sequence — PySpark 3.5.3 文档。
假设您有最小和最大日期,您可以在 DataFrame 中生成一个间隔,作为“参考”日历序列。
类似的东西
df = df\
.groupBy() \
.agg(
F.min("Date").alias("min"), F.max("Date").alias("max")
)\
.withColumn(
"sequence",
F.explode(F.sequence(F.col("min"), F.col("max")))
)\
.drop("min", "max")
这可以与原始 DataFrame 内部连接,以获得正确的结构,然后遵循您之前的逻辑。