我正在寻找一种优雅的方法来删除 DataFrame 中最近出现的 'TEST_COMPONENT' 为 'UNSATISFACTORY' 之前发生的所有记录,基于每个 ID 的 'TEST_DT' 值。
例如,给定以下 ID 5000 的 DataFrame:
| ID | TEST_ID | TEST_COMPONENT | TEST_DT |
|------|---------|----------------|-------------------------------|
| 5000 | ENGL | SATISFACTORY | 2023-01-04T00:00:00.000+11:00 |
| 5000 | ENGL | SATISFACTORY | 2022-09-07T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2022-09-07T00:00:00.000+10:00 |
| 5000 | ENGL | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2016-05-23T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2016-02-09T00:00:00.000+11:00 |
| 5000 | OTHER | NONE | 2016-02-09T00:00:00.000+11:00 |
| 5000 | OTHER | NONE | 2016-02-09T00:00:00.000+11:00 |
| 5000 | ENGL | UNSATISFACTORY | 2014-05-29T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2013-09-27T00:00:00.000+10:00 |
我只想保留最新的“UNSATISFACTORY” 记录以后的行。此示例所需的输出为:
| ID | TEST_ID | TEST_COMPONENT | TEST_DT |
|------|---------|----------------|-------------------------------|
| 5000 | ENGL | SATISFACTORY | 2023-01-04T00:00:00.000+11:00 |
| 5000 | ENGL | SATISFACTORY | 2022-09-07T00:00:00.000+10:00 |
| 5000 | OTHER | NONE | 2022-09-07T00:00:00.000+10:00 |
| 5000 | ENGL | UNSATISFACTORY | 2016-05-23T00:00:00.000+10:00 |
如何利用 PySpark 高效地实现这一目标?
df_max = df.filter(col("TEST_COMPONENT")=="UNSATISFACTORY")\
.groupBy("ID")\
.agg(max("TEST_DT")\
.alias("LATEST_UNSAT"))
df = df.alias("a").join(\
df_max.alias("b")\
,df.ID == df_max.ID\
,"left")
df = df.filter(col("TEST_DT") >= col("LATEST_UNSAT"))
这种方法有效,但我愿意听到更好的想法。