我在 Spark 数据框中进行条件分组时遇到问题
以下是完整示例
我有一个数据框,它已按用户和时间排序
activity location user
0 watch movie house A
1 sleep house A
2 cardio gym A
3 cardio gym B
4 buy biscuits shop B
5 cardio gym B
6 weight training gym B
我只想当给定用户的相邻行的“位置”字段相同时才执行 sum() 。所以它不仅仅是 df.groupby(['user','location']).activity.collect(",") 所需的输出将如下所示。另外,顺序很重要。
duration location user
watch movie,sleep house A
cardio gym A
cardio gym B
buy biscuits shop B
cardio, weight training gym B
与下面类似,但使用 pyspark 数据框
将 pyspark 数据帧转换为 pandas 会耗尽内存(由于数据集巨大) Groupby 相邻行 pandas 的条件总和
您需要两个步骤才能做到这一点。假设
df
是您的数据框
from pyspark.sql import functions as F, Window as W
df = df.withColumn(
"grp_id", F.lag("location").over(W.partitionBy("user").orderBy("time"))
).withColumn(
"grp_id",
F.sum(F.when(F.col("grp_id") == F.col("location"), 0).otherwise(1)).over(
W.partitionBy("user").orderBy("time").rowsBetween(W.unboundedPreceding, 0)
),
)
df.show()
+----+---------------+--------+----+------+
|time| activity|location|user|grp_id|
+----+---------------+--------+----+------+
| 0|watch movie | house| A| 1|
| 1|sleep | house| A| 1|
| 2|cardio | gym | A| 2|
| 3|cardio | gym | B| 1|
| 4|buy biscuits | shop | B| 2|
| 5|cardio | gym | B| 3|
| 6|weight training| gym | B| 3|
+----+---------------+--------+----+------+
df = df.groupBy("user", "grp_id", "location").agg(F.collect_list("activity"))
df.show()
+----+------+--------+----------------------------------+
|user|grp_id|location|collect_list(activity) |
+----+------+--------+----------------------------------+
|A |1 |house |[watch movie , sleep ]|
|A |2 |gym |[cardio ] |
|B |1 |gym |[cardio ] |
|B |2 |shop |[buy biscuits ] |
|B |3 |gym |[cardio , weight training]|
+----+------+--------+----------------------------------+