我有一个时间序列数据集。我希望创建一个新列来表示最后报告的(非空)值。我想我已经通过使用
lag
和 last
的组合解决了这部分问题
我还想知道最后报告的(非空)值的时间戳。我从不期望
timestamp_ms
为空,尽管 val
可以为空。
样本数据
df = spark.createDataFrame([
Row(timestamp_ms=1672531200000, val='19'),
Row(timestamp_ms=1672532100000, val='20'),
Row(timestamp_ms=1672533000000, val=None),
Row(timestamp_ms=1672533900000, val='22'),
Row(timestamp_ms=1672534800000, val=None),
Row(timestamp_ms=1672535700000, val=None),
Row(timestamp_ms=1672536600000, val='25'),
Row(timestamp_ms=1672537500000, val='20'),
Row(timestamp_ms=1672538400000, val='27')
])
df.show()
示例代码
返回最后一个滞后值并尝试返回报告该值时的时间戳。
df_lag = df.withColumn("lag_prev_val", F.lag("val")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_val", F.last("lag_prev_val", True)\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_time", F.lag("timestamp_ms")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)
df_lag.show()
电流输出
last_lag_prev_time
表示之前报告的时间戳,而不是与 last_lag_prev_val
关联的时间戳
timestamp_ms | 瓦尔 | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | 空 | 空 | 空 |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | 空 | 20 | 20 | 1672532100000 |
1672533900000 | 22 | 空 | 20 | 1672533000000 |
1672534800000 | 空 | 22 | 22 | 1672533900000 |
1672535700000 | 空 | 空 | 22 | 1672534800000 |
1672536600000 | 25 | 空 | 22 | 1672535700000 |
1672537500000 | 20 | 25 | 25 | 1672536600000 |
1672538400000 | 27 | 20 | 20 | 1672537500000 |
理想输出
我想要的输出(粗体显示差异)是用于
last_lag_prev_time
列来表示 timestamp_ms
值,该值与用于填充 `last_lag_prev_val' 的原始 val
位于同一行
timestamp_ms | 瓦尔 | lag_prev_val | last_lag_prev_val | last_lag_prev_time |
---|---|---|---|---|
1672531200000 | 19 | 空 | 空 | 空 |
1672532100000 | 20 | 19 | 19 | 1672531200000 |
1672533000000 | 空 | 20 | 20 | 1672532100000 |
1672533900000 | 22 | 空 | 20 | 1672532100000 |
1672534800000 | 空 | 22 | 22 | 1672533900000 |
1672535700000 | 空 | 空 | 22 | 1672533900000 |
1672536600000 | 25 | 空 | 22 | 1672533900000 |
1672537500000 | 20 | 25 | 25 | 1672536600000 |
1672538400000 | 27 | 20 | 20 | 1672537500000 |
一种解决方案是只考虑没有 NULL 值的行中的时间戳,我们可以通过创建一个名为
val_timestamp_ms
的列来实现,然后从这个新列中获取最后一个时间戳并应用滞后。例如:
df.withColumn("lag_prev_val", F.lag("val")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_val", F.last("lag_prev_val", True)\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("val_timestamp_ms", F.when(F.col("val").isNull(), None)\
.otherwise(F.col("timestamp_ms"))
)\
.withColumn("last_prev_time", F.last("val_timestamp_ms", True)\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)\
.withColumn("last_lag_prev_time", F.lag("last_prev_time")\
.over(Window.partitionBy()\
.orderBy("timestamp_ms"))
)
+-------------+----+------------+-----------------+----------------+--------------+------------------+
| timestamp_ms| val|lag_prev_val|last_lag_prev_val|val_timestamp_ms|last_prev_time|last_lag_prev_time|
+-------------+----+------------+-----------------+----------------+--------------+------------------+
|1672531200000| 19| NULL| NULL| 1672531200000| 1672531200000| NULL|
|1672532100000| 20| 19| 19| 1672532100000| 1672532100000| 1672531200000|
|1672533000000|NULL| 20| 20| NULL| 1672532100000| 1672532100000|
|1672533900000| 22| NULL| 20| 1672533900000| 1672533900000| 1672532100000|
|1672534800000|NULL| 22| 22| NULL| 1672533900000| 1672533900000|
|1672535700000|NULL| NULL| 22| NULL| 1672533900000| 1672533900000|
|1672536600000| 25| NULL| 22| 1672536600000| 1672536600000| 1672533900000|
|1672537500000| 20| 25| 25| 1672537500000| 1672537500000| 1672536600000|
|1672538400000| 27| 20| 20| 1672538400000| 1672538400000| 1672537500000|
+-------------+----+------------+-----------------+----------------+--------------+------------------+