我正在使用包含时间序列数据的 pyspark 数据框(Python)。数据的结构如下:
event_time variable value step ID
1456942945 var_a 123.4 1 id_1
1456931076 var_b 857.01 1 id_1
1456932268 var_b 871.74 1 id_1
1456940055 var_b 992.3 2 id_1
1456932781 var_c 861.3 2 id_1
1456937186 var_c 959.6 3 id_1
1456934746 var_d 0.12 4 id_1
1456942945 var_a 123.4 1 id_2
1456931076 var_b 847.01 1 id_2
1456932268 var_b 871.74 1 id_2
1456940055 var_b 932.3 2 id_2
1456932781 var_c 821.3 3 id_2
1456937186 var_c 969.6 4 id_2
1456934746 var_d 0.12 4 id_2
对于每个 id,我在特定的“步骤”获取每个变量的值。
我需要像这样子集这个数据帧:对于每个id,获取与步骤1、2、3相对应的所有行以及从步骤4的first_event时间值开始的步骤4数据的一部分,比方说前25%。此分配应根据活动时间进行。
在根据该 id 对 DF 进行子集化之后,我可以对单个 id 执行此操作:
# single step partitioning
threshold_value = DF.selectExpr(f"percentile_approx({"event_time"}, {0.25}) as threshold").collect()[0]["threshold"]
partitioned_df= DF.filter(col(column_name) <= threshold_value)
# First 3 steps
first_3_steps_df = DF.filter((col("step").isin([1,2,3])))
然后我将连接partitioned_df和first_3_steps_df以获得1个特定id的所需输出。我一直坚持为 DF 中的每个 id 迭代这种分区,而没有实际上单独为每个 id 迭代该过程。
我也可以在 pandas 中做到这一点,但是 DF 很大,我真的需要坚持使用 Pyspark,所以请不要使用 Pandas 答案。
按
ID
对数据进行分组,并使用percentile_approx作为聚合函数来计算step=4的阈值。然后使用这些值创建一个 where
子句来过滤数据:
from pyspark.sql import functions as F
df = ...
threshold = df.where('step = 4') \
.groupBy('ID') \
.agg(F.percentile_approx('event_time', 0.25)) \
.collect()
threshold = [(r[0],r[1]) for r in threshold]
whereStmt = 'step=1 or step=2 or step=3'
for r in threshold:
whereStmt = whereStmt + f' or (step=4 and ID={r[0]} and event_time<={r[1]})'
df.where(F.expr(whereStmt)).show()