这是我的 Spark 数据框,
+---+----------------------------------+----------+----------+
|id |timestamp |Fname |Lname |
+---+----------------------------------+----------+----------+
|1 |2024-01-19T11:52:44.775205Z |Robert |Albert |
|1 |2024-01-20T11:52:44.775205Z |Remo |Lergos |
|2 |2024-01-21T11:52:44.775205Z |Charlie |Jameson |
|2 |2024-01-22T11:52:44.775205Z |Anastacio |Sporer |
|2 |2024-01-23T11:52:44.775205Z |Luz |Toy |
|3 |2024-01-24T11:52:44.775205Z |Crystal |Hills |
|3 |2024-01-25T11:52:44.775205Z |Nicholas |Johnson |
+---+----------------------------------+----------+----------+
以下是所涉及的步骤,
根据上述步骤,我尝试获取如下结果数据框,
+----+--------------------------------------+----------------------------+-----------------------------------------------------------------------------------------------------+
|id |latest_names |latest_timestamp |all_names |
+----+--------------------------------------+----------------------------+-----------------------------------------------------------------------------------------------------+
|1 |{"Fname":"Remo","Lname":"Lergos"} |2024-01-20T11:52:44.775205Z |[{"Fname":"Remo","Lname":"Lergos"},{"Fname":"Remo","Lname":"Lergos"}] |
|2 |{"Fname":"Luz","Lname":"Toy"} |2024-01-23T11:52:44.775205Z |[{"Fname":"Luz","Lname":"Toy"},{"Fname":"Remo","Lname":"Lergos"},{"Fname":"Remo","Lname":"Lergos"}] |
|3 |{"Fname":"Nicholas","Lname":"Johnson"}|2024-01-25T11:52:44.775205Z |[{"Fname":"Nicholas","Lname":"Johnson"},{"Fname":"Remo","Lname":"Lergos"}] |
+----+--------------------------------------+----------------------------+-----------------------------------------------------------------------------------------------------+
我尝试了以下带有 join 和 windowspec 的 pyspark 代码来获取带有时间戳 desc 的第一个元素,
windowspec = Window.partitionBy("id").orderBy(df["timestamp"].desc())
columns_names = ["Fname","Lname"]
df.withColumn(
"all_names",
F.to_json(
F.struct(
"Fname","Lname"
)
),
)
.withColumn(
"latest_names",
F.to_json(
F.struct(*[F.first(field).over(windowspec).alias(field) for field in columns_names])
),
)
.withColumn("latest_timestamp", F.first("timestamp").over(windowspec).alias("timestamp"))
.groupBy("id")
.agg(
F.collect_set("all_names").alias("all_names"),
F.first("latest_names").alias("latest_names"),
F.first("l_timestamp").alias("latest_timestamp"),
)
我能够达到结果,但想知道是否有更好的方法,因为我有多个列来执行 (Fname,Lname) 的类似操作 我有其他列(地址1,地址2,地址3),我想在其中执行相同的列操作来获取最新地址,我使用单个windowspec来执行该操作,但是有更好的方法吗???
尝试一下,让我知道你的想法:
import pyspark.sql.functions as f
df = (
df
.groupBy('id')
.agg(
f.collect_list(f.struct('timestamp', 'Fname', 'Lname')).alias('all_names'),
f.max('timestamp').alias('latest_timestamp')
)
.withColumn('latest_name', f.expr("filter(all_names, x -> x.timestamp = latest_timestamp)")[0])
)