在 PySpark 中创建当前余额列

问题描述 投票:0回答:1

我在 PySpark 代码中创建了以下数据框:

+---------------+-------------+---------------+------+
|TransactionDate|AccountNumber|TransactionType|Amount|
+---------------+-------------+---------------+------+
|     2023-01-01|          100|         Credit|  1000|
|     2023-01-02|          100|         Credit|  1500|
|     2023-01-03|          100|          Debit|  1000|
|     2023-01-02|          200|         Credit|  3500|
|     2023-01-03|          200|          Debit|  2000|
|     2023-01-04|          200|         Credit|  3500|
|     2023-01-13|          300|         Credit|  4000|
|     2023-01-14|          300|          Debit|  4500|
|     2023-01-15|          300|         Credit|  5000|
+---------------+-------------+---------------+------+

我需要打印另一列作为 CurrentBalance

预期输出:

+---------------+-------------+---------------+------+--------------+
|TransactionDate|AccountNumber|TransactionType|Amount|CurrentBalance|
+---------------+-------------+---------------+------+--------------+
|     2023-01-01|          100|         Credit|  1000|          1000|
|     2023-01-02|          100|         Credit|  1500|          2500|
|     2023-01-03|          100|          Debit|  1000|          1500|
|     2023-01-02|          200|         Credit|  3500|          3500|
|     2023-01-03|          200|          Debit|  2000|          1500|
|     2023-01-04|          200|         Credit|  3500|          5000|
|     2023-01-13|          300|         Credit|  4000|          4000|
|     2023-01-14|          300|          Debit|  4500|          -500|
|     2023-01-15|          300|         Credit|  5000|          1000|
+---------------+-------------+---------------+------+--------------+

我尝试过使用最小日期并在when条件下传递日期来计算贷方和借方,但它似乎不起作用。

# Find minimum date in TransactionDate column, grouped by AccountNumber column
df_new.groupBy('AccountNumber').agg(f.min('TransactionDate').alias('min_date'))
dataframe apache-spark pyspark aggregate apache-synapse
1个回答
0
投票

为此您将需要一个窗口函数。窗口函数对分区(组)中的每一行进行计算。在这种情况下,您需要逐行对所有值求和。

此外,简单的求和不起作用,因为您没有负数,因此您必须使用“TransactionType”列中的输入进行计算。

示例数据:

from pyspark.sql import functions as F, Window as W

df = spark.createDataFrame(
    [('2023-01-01', '100', 'Credit', 1000),
     ('2023-01-02', '100', 'Credit', 1500),
     ('2023-01-03', '100', 'Debit', 1000),
     ('2023-01-02', '200', 'Credit', 3500),
     ('2023-01-03', '200', 'Debit', 2000),
     ('2023-01-04', '200', 'Credit', 3500),
     ('2023-01-13', '300', 'Credit', 4000),
     ('2023-01-14', '300', 'Debit', 4500),
     ('2023-01-15', '300', 'Credit', 5000)],
    ['TransactionDate', 'AccountNumber', 'TransactionType', 'Amount'])

脚本:

sign = F.when(F.col('TransactionType') == 'Debit', -1).otherwise(1)
amount = sign * F.col('amount')
window = W.partitionBy('AccountNumber').orderBy('TransactionDate')

df = df.withColumn('CurrentBalance', F.sum(amount).over(window))

df.show() 
# +---------------+-------------+---------------+------+--------------+
# |TransactionDate|AccountNumber|TransactionType|Amount|CurrentBalance|
# +---------------+-------------+---------------+------+--------------+
# |     2023-01-01|          100|         Credit|  1000|          1000|
# |     2023-01-02|          100|         Credit|  1500|          2500|
# |     2023-01-03|          100|          Debit|  1000|          1500|
# |     2023-01-02|          200|         Credit|  3500|          3500|
# |     2023-01-03|          200|          Debit|  2000|          1500|
# |     2023-01-04|          200|         Credit|  3500|          5000|
# |     2023-01-13|          300|         Credit|  4000|          4000|
# |     2023-01-14|          300|          Debit|  4500|          -500|
# |     2023-01-15|          300|         Credit|  5000|          4500|
# +---------------+-------------+---------------+------+--------------+
© www.soinside.com 2019 - 2024. All rights reserved.