我在 PySpark 代码中创建了以下数据框:
+---------------+-------------+---------------+------+
|TransactionDate|AccountNumber|TransactionType|Amount|
+---------------+-------------+---------------+------+
| 2023-01-01| 100| Credit| 1000|
| 2023-01-02| 100| Credit| 1500|
| 2023-01-03| 100| Debit| 1000|
| 2023-01-02| 200| Credit| 3500|
| 2023-01-03| 200| Debit| 2000|
| 2023-01-04| 200| Credit| 3500|
| 2023-01-13| 300| Credit| 4000|
| 2023-01-14| 300| Debit| 4500|
| 2023-01-15| 300| Credit| 5000|
+---------------+-------------+---------------+------+
我需要打印另一列作为 CurrentBalance。
预期输出:
+---------------+-------------+---------------+------+--------------+
|TransactionDate|AccountNumber|TransactionType|Amount|CurrentBalance|
+---------------+-------------+---------------+------+--------------+
| 2023-01-01| 100| Credit| 1000| 1000|
| 2023-01-02| 100| Credit| 1500| 2500|
| 2023-01-03| 100| Debit| 1000| 1500|
| 2023-01-02| 200| Credit| 3500| 3500|
| 2023-01-03| 200| Debit| 2000| 1500|
| 2023-01-04| 200| Credit| 3500| 5000|
| 2023-01-13| 300| Credit| 4000| 4000|
| 2023-01-14| 300| Debit| 4500| -500|
| 2023-01-15| 300| Credit| 5000| 1000|
+---------------+-------------+---------------+------+--------------+
我尝试过使用最小日期并在when条件下传递日期来计算贷方和借方,但它似乎不起作用。
# Find minimum date in TransactionDate column, grouped by AccountNumber column
df_new.groupBy('AccountNumber').agg(f.min('TransactionDate').alias('min_date'))
为此您将需要一个窗口函数。窗口函数对分区(组)中的每一行进行计算。在这种情况下,您需要逐行对所有值求和。
此外,简单的求和不起作用,因为您没有负数,因此您必须使用“TransactionType”列中的输入进行计算。
示例数据:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[('2023-01-01', '100', 'Credit', 1000),
('2023-01-02', '100', 'Credit', 1500),
('2023-01-03', '100', 'Debit', 1000),
('2023-01-02', '200', 'Credit', 3500),
('2023-01-03', '200', 'Debit', 2000),
('2023-01-04', '200', 'Credit', 3500),
('2023-01-13', '300', 'Credit', 4000),
('2023-01-14', '300', 'Debit', 4500),
('2023-01-15', '300', 'Credit', 5000)],
['TransactionDate', 'AccountNumber', 'TransactionType', 'Amount'])
脚本:
sign = F.when(F.col('TransactionType') == 'Debit', -1).otherwise(1)
amount = sign * F.col('amount')
window = W.partitionBy('AccountNumber').orderBy('TransactionDate')
df = df.withColumn('CurrentBalance', F.sum(amount).over(window))
df.show()
# +---------------+-------------+---------------+------+--------------+
# |TransactionDate|AccountNumber|TransactionType|Amount|CurrentBalance|
# +---------------+-------------+---------------+------+--------------+
# | 2023-01-01| 100| Credit| 1000| 1000|
# | 2023-01-02| 100| Credit| 1500| 2500|
# | 2023-01-03| 100| Debit| 1000| 1500|
# | 2023-01-02| 200| Credit| 3500| 3500|
# | 2023-01-03| 200| Debit| 2000| 1500|
# | 2023-01-04| 200| Credit| 3500| 5000|
# | 2023-01-13| 300| Credit| 4000| 4000|
# | 2023-01-14| 300| Debit| 4500| -500|
# | 2023-01-15| 300| Credit| 5000| 4500|
# +---------------+-------------+---------------+------+--------------+