如何优化Azure Synapse Spark笔记本中大数据集的累积和字段计算?

问题描述 投票:0回答:1

我有一个包含 300 万多条记录和几列的数据集。这是我的数据集的示例:

项目 item_base 日期 数量_1 数量_2
1 20 202410 600 7493
1 20 202411 17000 16431

每个

item-item_base-date
都会生成一个唯一的密钥。我需要根据以下逻辑计算新列“actual_value”:

假设我们对表中的 item-item_base 进行排名操作并按日期排序,那么,
对于

rank = 1
actual_value = quantity_1

对于
rank = 2
actual_value = quantity_1 - quantity_2

对于
rank > 2
actual_value = quantity_1 - sum(all prev quantity_1) - sum(all prev quantity_2) - sum(all prev actual_value)

这是我解决这个问题的方法:

首先,我创建 2 个附加列

cumulative_1
cumulative_2
,它们基本上是使用 SQL 窗口的数量_1 和数量_2 的总和。
SUM(quantity_1) OVER(PARTITION BY item, item_base ORDER BY date ROWS BETWEEN UNBOUNDED PRECEIDING AND 1 PRECEDING) as cumulative_1
等等。另外,我正在创建一个排名列作为 row_no 标识符。

Spark 不支持递归 CTE,因此实现

sum(all prev actual_value)
非常繁琐。我不得不切换到 pandas dataframe 来完成计算。这是我的代码:

my_df = df.toPandas() 
my_df['actual_value'] = 0.0

for i in range(len(my_df)):
    if my_df.at[i, 'rank'] == 1:
        my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1']
    elif my_df.at[i, 'rank'] == 2:
        my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'quantity_2']
    else:
        previous_actual_values = my_df.loc[(my_df['item'] == my_df.at[i, 'item']) & 
                                           (my_df['item_base'] == my_df.at[i, 'item_base']) & 
                                           (my_df['date'] < my_df.at[i, 'date']), 'actual_value'].sum()

        my_df.at[i, 'actual_value'] = my_df.at[i, 'quantity_1'] - my_df.at[i, 'cumulative_2'] - my_df.at[i, 'cumulative_1'] - previous_actual_values

    if my_df.at[i, 'actual_value'] < 0:
        my_df.at[i, 'actual_value'] = 0

代码完成了工作并给了我正确的输出。

item        | item_base| date    | quantity_1 | quantity_2 | cumulative_1 | cumulative_2 | rank | actual_value
------------|----------|---------|------------|------------|--------------|--------------|------|--------------
1           | 20       | 202410  | 600        | 7493       |              |              | 1    | 600
1           | 20       | 202411  | 17000      | 16431      | 600          | 7493         | 2    | 569
1           | 20       | 202412  | 785        | 24456      | 17600        | 23924        | 3    | 0
1           | 20       | 202501  | 0          | 25775      | 18385        | 48380        | 4    | 0
1           | 20       | 202502  |            | 26131      | 18385        | 74155        | 5    | 
1           | 20       | 202503  | 0          | 39452      | 18385        | 100286       | 6    | 0
1           | 20       | 202504  |            | 38087      | 18385        | 139738       | 7    | 
1           | 20       | 202505  | 2856       | 28916      | 18385        | 177825       | 8    | 0
1           | 20       | 202506  | 500000     | 42254      | 21241        | 206741       | 9    | 270849
1           | 20       | 202507  |            | 36776      | 521241       | 248995       | 10   | 
1           | 20       | 202508  | 660        | 23523      | 521241       | 285771       | 11   | 0
1           | 20       | 202509  | 1316000    | 25543      | 521901       | 309294       | 12   | 212787
1           | 20       | 202510  | 265220     | 30589      | 1837901      | 334837       | 13   | 0
1           | 20       | 202511  | 47580      |            | 1864421      | 365426       | 14   | 0

现在,问题来了。因为我必须使用 pandas,所以代码需要很长时间才能处理更大的数据集。我需要找到一种方法在 Spark 本身中执行此操作,或者提高上述代码的效率。我已经考虑过对计算进行向量化,但我正在努力寻找一种有效的方法来计算排名 > 2 的行的累积实际值。

编辑:我无法修复输出表的格式,这里是输出的屏幕截图: enter image description here

python pandas apache-spark apache-spark-sql azure-synapse
1个回答
0
投票

我尝试过以下方法:

window_spec = Window.partitionBy("item", "item_base").orderBy("date")
df = df.withColumn("rank", row_number().over(window_spec))
cumulative_window = window_spec.rowsBetween(Window.unboundedPreceding, -1)
df = (
    df.withColumn("cumulative_1", _sum("quantity_1").over(cumulative_window))
      .withColumn("cumulative_2", _sum("quantity_2").over(cumulative_window))
)
df = df.fillna({"cumulative_1": 0, "cumulative_2": 0, "quantity_1": 0, "quantity_2": 0})
pandas_df = df.toPandas()
pandas_df['actual_value'] = 0.0
for i in range(len(pandas_df)):
    if pandas_df.at[i, 'rank'] == 1:
        pandas_df.at[i, 'actual_value'] = pandas_df.at[i, 'quantity_1']
    elif pandas_df.at[i, 'rank'] == 2:
        pandas_df.at[i, 'actual_value'] = pandas_df.at[i, 'quantity_1'] - pandas_df.at[i, 'quantity_2']
    else:
        previous_actual_values = pandas_df.loc[
            (pandas_df['item'] == pandas_df.at[i, 'item']) &
            (pandas_df['item_base'] == pandas_df.at[i, 'item_base']) &
            (pandas_df['date'] < pandas_df.at[i, 'date']), 'actual_value'
        ].sum()

        pandas_df.at[i, 'actual_value'] = (
            pandas_df.at[i, 'quantity_1'] - 
            pandas_df.at[i, 'cumulative_1'] - 
            pandas_df.at[i, 'cumulative_2'] - 
            previous_actual_values
        )
    if pandas_df.at[i, 'actual_value'] < 0:
        pandas_df.at[i, 'actual_value'] = 0
print(pandas_df)

在上面的代码中,添加基于 item、item_base 和 order by date 的排名列,并计算 cumulative_1cumulative_2 用 0 填充累积列和数量的空值,并转换为 pandas DataFrame 以实现递归逻辑。 初始化actual_value列并迭代计算actual_value

结果:

item  item_base    date  quantity_1  quantity_2  rank  cumulative_1  
0      1         20  202410         600        7493     1             0   
1      1         20  202411       17000       16431     2           600   
2      1         20  202412         785       24456     3         17600   
3      1         20  202501           0       25775     4         18385   
4      1         20  202502           0       26131     5         18385   
5      1         20  202503           0       39452     6         18385   
6      1         20  202504           0       38087     7         18385   
7      1         20  202505        2856       28916     8         18385   
8      1         20  202506      500000       42254     9         21241   
9      1         20  202507           0       36776    10        521241   
10     1         20  202508         660       23523    11        521241   
11     1         20  202509     1316000       25543    12        521901   
12     1         20  202510      265220       30589    13       1837901   
13     1         20  202511       47580           0    14       2103121   

    cumulative_2  actual_value  
0              0         600.0  
1           7493         569.0  
2          23924           0.0  
3          48380           0.0  
4          74155           0.0  
5         100286           0.0  
6         139738           0.0  
7         177825           0.0  
8         206741      270849.0  
9         248995           0.0  
10        285771           0.0  
11        309294      212787.0  
12        334837           0.0  
13        365426           0.0  



© www.soinside.com 2019 - 2024. All rights reserved.