如何拆分 pyspark 数据帧,为每个不同的 id 获取一部分数据

问题描述 投票:0回答:1

我正在使用包含时间序列数据的 pyspark 数据框(Python)。数据的结构如下:

event_time  variable value   step   ID 
1456942945  var_a    123.4    1      id_1
1456931076  var_b    857.01   1      id_1
1456932268  var_b    871.74   1      id_1
1456940055  var_b    992.3    2      id_1
1456932781  var_c    861.3    2      id_1
1456937186  var_c    959.6    3      id_1
1456934746  var_d    0.12     4      id_1

1456942945  var_a    123.4    1      id_2
1456931076  var_b    847.01   1      id_2
1456932268  var_b    871.74   1      id_2
1456940055  var_b    932.3    2      id_2
1456932781  var_c    821.3    3      id_2
1456937186  var_c    969.6    4      id_2
1456934746  var_d    0.12     4      id_2

对于每个 id,我在特定的“步骤”获取每个变量的值。

我需要像这样子集这个数据帧:对于每个id,获取与步骤1、2、3相对应的所有行以及从步骤4的first_event时间值开始的步骤4数据的一部分,比方说前25%。此分配应根据活动时间进行。

在根据该 id 对 DF 进行子集化之后,我可以对单个 id 执行此操作:

# single step partitioning 
threshold_value = DF.selectExpr(f"percentile_approx({"event_time"}, {0.25}) as threshold").collect()[0]["threshold"]

partitioned_df= DF.filter(col(column_name) <= threshold_value)

# First 3 steps
first_3_steps_df = DF.filter((col("step").isin([1,2,3])))

然后我将连接partitioned_df和first_3_steps_df以获得1个特定id的所需输出。我一直坚持为 DF 中的每个 id 迭代这种分区,而没有实际上单独为每个 id 迭代该过程。

我也可以在 pandas 中做到这一点,但是 DF 很大,我真的需要坚持使用 Pyspark,所以请不要使用 Pandas 答案。

python pyspark
1个回答
0
投票

ID
对数据进行分组,并使用percentile_approx作为聚合函数来计算step=4的阈值。然后使用这些值创建一个
where
子句来过滤数据:

from pyspark.sql import functions as F

df = ...

threshold = df.where('step = 4') \
    .groupBy('ID') \
    .agg(F.percentile_approx('event_time', 0.25)) \
    .collect()

threshold = [(r[0],r[1]) for r in threshold]

whereStmt = 'step=1 or step=2 or step=3'

for r in threshold:
    whereStmt = whereStmt + f' or (step=4 and ID={r[0]} and event_time<={r[1]})'

df.where(F.expr(whereStmt)).show()
© www.soinside.com 2019 - 2024. All rights reserved.