我有一个包含 300 万多条记录和几列的数据集。这是我的数据集的示例:
项目 | item_base | 日期 | 数量_1 | 数量_2 |
---|---|---|---|---|
1 | 20 | 202410 | 600 | 7493 |
1 | 20 | 202411 | 17000 | 16431 |
每个
item-item_base-date
都会生成一个唯一的密钥。我需要根据以下逻辑计算新列“actual_value”:
假设我们对表中的 item-item_base 进行排名操作并按日期排序,那么,
对于
rank = 1
,actual_value = quantity_1
,rank = 2
,actual_value = quantity_1 - quantity_2
。 rank > 2
,actual_value = quantity_1 - sum(all prev quantity_1) - sum(all prev quantity_2) - sum(all prev actual_value)
这是我解决这个问题的方法:
首先,我创建 2 个附加列
cumulative_1
和 cumulative_2
,它们基本上是使用 SQL 窗口的数量_1 和数量_2 的总和。SUM(quantity_1) OVER(PARTITION BY item, item_base ORDER BY date ROWS BETWEEN UNBOUNDED PRECEIDING AND 1 PRECEDING) as cumulative_1
等等。另外,我正在创建一个排名列作为 row_no 标识符。
Spark 不支持递归 CTE,因此实现
sum(all prev actual_value)
非常繁琐。我不得不切换到 pandas dataframe 来完成计算。这是我的代码:
my_df = df.toPandas()
my_df['actual_value'] = 0.0
for i in range(len(my_df)):
if my_df.at[i, 'rank'] == 1:
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1']
elif my_df.at[i, 'rank'] == 2:
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'quantity_2']
else:
previous_actual_values = my_df.loc[(my_df['item'] == my_df.at[i, 'item']) &
(my_df['item_base'] == my_df.at[i, 'item_base']) &
(my_df['date'] < my_df.at[i, 'date']), 'actual_value'].sum()
my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'cumulative_2'] - my_df.at[i, 'cumulative_1'] - previous_actual_values
if my_df.at[i, 'actual_value'] < 0:
my_df.at[i, 'actual_value'] = 0
代码完成了工作并给了我正确的输出。
item | item_base| date | quantity_1 | quantity_2 | cumulative_1 | cumulative_2 | rank | actual_value
------------|----------|---------|------------|------------|--------------|--------------|------|--------------
1 | 20 | 202410 | 600 | 7493 | | | 1 | 600
1 | 20 | 202411 | 17000 | 16431 | 600 | 7493 | 2 | 569
1 | 20 | 202412 | 785 | 24456 | 17600 | 23924 | 3 | 0
1 | 20 | 202501 | 0 | 25775 | 18385 | 48380 | 4 | 0
1 | 20 | 202502 | | 26131 | 18385 | 74155 | 5 |
1 | 20 | 202503 | 0 | 39452 | 18385 | 100286 | 6 | 0
1 | 20 | 202504 | | 38087 | 18385 | 139738 | 7 |
1 | 20 | 202505 | 2856 | 28916 | 18385 | 177825 | 8 | 0
1 | 20 | 202506 | 500000 | 42254 | 21241 | 206741 | 9 | 270849
1 | 20 | 202507 | | 36776 | 521241 | 248995 | 10 |
1 | 20 | 202508 | 660 | 23523 | 521241 | 285771 | 11 | 0
1 | 20 | 202509 | 1316000 | 25543 | 521901 | 309294 | 12 | 212787
1 | 20 | 202510 | 265220 | 30589 | 1837901 | 334837 | 13 | 0
1 | 20 | 202511 | 47580 | | 1864421 | 365426 | 14 | 0
现在,问题来了。因为我必须使用 pandas,所以代码需要很长时间才能处理更大的数据集。我需要找到一种方法在 Spark 本身中执行此操作,或者提高上述代码的效率。我已经考虑过对计算进行向量化,但我正在努力寻找一种有效的方法来计算排名 > 2 的行的累积实际值。
我尝试过以下方法:
window_spec = Window.partitionBy("item", "item_base").orderBy("date")
df = df.withColumn("rank", row_number().over(window_spec))
cumulative_window = window_spec.rowsBetween(Window.unboundedPreceding, -1)
df = (
df.withColumn("cumulative_1", _sum("quantity_1").over(cumulative_window))
.withColumn("cumulative_2", _sum("quantity_2").over(cumulative_window))
)
df = df.fillna({"cumulative_1": 0, "cumulative_2": 0, "quantity_1": 0, "quantity_2": 0})
pandas_df = df.toPandas()
pandas_df['actual_value'] = 0.0
for i in range(len(pandas_df)):
if pandas_df.at[i, 'rank'] == 1:
pandas_df.at[i, 'actual_value'] = pandas_df.at[i, 'quantity_1']
elif pandas_df.at[i, 'rank'] == 2:
pandas_df.at[i, 'actual_value'] = pandas_df.at[i, 'quantity_1'] - pandas_df.at[i, 'quantity_2']
else:
previous_actual_values = pandas_df.loc[
(pandas_df['item'] == pandas_df.at[i, 'item']) &
(pandas_df['item_base'] == pandas_df.at[i, 'item_base']) &
(pandas_df['date'] < pandas_df.at[i, 'date']), 'actual_value'
].sum()
pandas_df.at[i, 'actual_value'] = (
pandas_df.at[i, 'quantity_1'] -
pandas_df.at[i, 'cumulative_1'] -
pandas_df.at[i, 'cumulative_2'] -
previous_actual_values
)
if pandas_df.at[i, 'actual_value'] < 0:
pandas_df.at[i, 'actual_value'] = 0
print(pandas_df)
在上面的代码中,添加基于 item、item_base 和 order by date 的排名列,并计算 cumulative_1 和 cumulative_2 用 0 填充累积列和数量的空值,并转换为 pandas DataFrame 以实现递归逻辑。 初始化actual_value列并迭代计算actual_value
结果:
item item_base date quantity_1 quantity_2 rank cumulative_1
0 1 20 202410 600 7493 1 0
1 1 20 202411 17000 16431 2 600
2 1 20 202412 785 24456 3 17600
3 1 20 202501 0 25775 4 18385
4 1 20 202502 0 26131 5 18385
5 1 20 202503 0 39452 6 18385
6 1 20 202504 0 38087 7 18385
7 1 20 202505 2856 28916 8 18385
8 1 20 202506 500000 42254 9 21241
9 1 20 202507 0 36776 10 521241
10 1 20 202508 660 23523 11 521241
11 1 20 202509 1316000 25543 12 521901
12 1 20 202510 265220 30589 13 1837901
13 1 20 202511 47580 0 14 2103121
cumulative_2 actual_value
0 0 600.0
1 7493 569.0
2 23924 0.0
3 48380 0.0
4 74155 0.0
5 100286 0.0
6 139738 0.0
7 177825 0.0
8 206741 270849.0
9 248995 0.0
10 285771 0.0
11 309294 212787.0
12 334837 0.0
13 365426 0.0