考虑以下示例数据集。
日期 | 符号 | 数量 | 每份价格 | 类型 |
---|---|---|---|---|
2022 年 7 月 7 日 | REL2300PE | 200 | 50 | 购买 |
2022 年 7 月 7 日 | IDBI2300PE | 200 | 50 | 卖 |
2022 年 7 月 15 日 | REL2300PE | 100 | 50 | 购买 |
2022 年 7 月 15 日 | IDBI2300PE | 20 | 50 | 购买 |
2022 年 7 月 16 日 | REL2300PE | 200 | 35 | 购买 |
2022 年 7 月 30 日 | IDBI2300PE | 60 | 50 | 卖 |
2022 年 7 月 30 日 | REL2300PE | 450 | 45 | 卖 |
2022 年 7 月 30 日 | IDBI2300PE | 200 | 25 | 卖 |
如果我们关注股票代码“REL2300PE”,它的销售量为 450 支,总销售价值为 20,250 美元。现在,我们来计算一下该股票前 450 单位的成本价。成本价可以通过将与该股票相关的每笔购买交易的数量和单位价格的乘积相加来计算。在这种情况下,前 450 个单位的成本价计算如下:200 个单位 * 50 美元 + 100 个单位 * 50 美元 + 150 个单位 * 35 美元 = 20,250 美元。由于前 450 个单位的销售价值和成本价相同(20,250 美元),因此该股票的利润/亏损应为 0。
你可以遵循这个算法:
输入:
from pyspark.sql import Window
from pyspark.sql.functions import col, when, sum
data = [\
('05 July 2022', 'IDBI2300PE', 500, 45, 'buy'),\
('07 July 2022', 'REL2300PE', 200, 50, 'buy'),\
('07 July 2022', 'IDBI2300PE', 200, 50, 'sell'),\
('15 July 2022', 'REL2300PE', 100, 50, 'buy'),\
('15 July 2022', 'IDBI2300PE', 20, 50, 'buy'),\
('16 July 2022', 'REL2300PE', 200, 35, 'buy'),\
('20 July 2022', 'REL2300PE', 200, 45, 'sell'),\
('30 July 2022', 'IDBI2300PE', 60, 50, 'sell'),\
('30 July 2022', 'REL2300PE', 250, 45, 'sell'),\
('31 July 2022', 'IDBI2300PE', 200, 25, 'sell')]
df = spark.createDataFrame(data, ["Date", "symbol", "qty", "price", "type"])
计算消耗数量:
# Calculate cumulative sum for sells and buys for the current and all prev rows
cum_sum_wind = Window.partitionBy('symbol').orderBy('Date').rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn('cum_sum', sum(when(col('type') == 'sell', -1 * col('qty'))\
.otherwise(col('qty'))).over(cum_sum_wind))
# Calculate cumulative sum for sells only for the current and all following rows
sell_cum_sum_wind = Window.partitionBy('symbol').orderBy('Date').rangeBetween(0, Window.unboundedFollowing)
df = df.withColumn('sell_cum_sum', sum(when(col('type') == 'sell', col('qty'))\
.otherwise(0)).over(sell_cum_sum_wind))
# Calculate the actual consumed qty
df = df.withColumn('cons_qty', when(col('type') == 'sell', col('qty'))\
.when(col('sell_cum_sum') > col('cum_sum'), col('qty'))\
# If negative then nothing is consumed from this row
.when((col('qty') - (col('cum_sum') - col('sell_cum_sum'))) < 0, 0)\
.otherwise(col('qty') - (col('cum_sum') - col('sell_cum_sum'))))
df.show()
+------------+----------+---+-----+----+-------+------------+--------+
| Date| symbol|qty|price|type|cum_sum|sell_cum_sum|cons_qty|
+------------+----------+---+-----+----+-------+------------+--------+
|05 July 2022|IDBI2300PE|500| 45| buy| 500| 460| 460|
|07 July 2022|IDBI2300PE|200| 50|sell| 300| 460| 200|
|15 July 2022|IDBI2300PE| 20| 50| buy| 320| 260| 0|
|30 July 2022|IDBI2300PE| 60| 50|sell| 260| 260| 60|
|31 July 2022|IDBI2300PE|200| 25|sell| 60| 200| 200|
|07 July 2022| REL2300PE|200| 50| buy| 200| 450| 200|
|15 July 2022| REL2300PE|100| 50| buy| 300| 450| 100|
|16 July 2022| REL2300PE|200| 35| buy| 500| 450| 150|
|20 July 2022| REL2300PE|200| 45|sell| 300| 450| 200|
|30 July 2022| REL2300PE|250| 45|sell| 50| 250| 250|
+------------+----------+---+-----+----+-------+------------+--------+
计算每个交易品种的全局损益:
# Groupby symbol and calculate the profit/loss
result = df.groupby('symbol')\
.agg(
sum(when(col('type') == 'buy', -1 * col('price') * col('cons_qty'))\
.otherwise(col('price') * col('cons_qty'))\
).alias("profit_loss"))
result.show()
+----------+-----------+
| symbol|profit_loss|
+----------+-----------+
|IDBI2300PE| -2700|
| REL2300PE| 0|
+----------+-----------+
使用
withColumn
添加新列total_price,如果类型为“买入”,则使用附加计算将total_price乘以-1,如果类型为“卖出”,则乘以1
df = df.withColumn('total_price', when(df.type == 'buy', (df.qty * df.price_per_qty * (-1))).otherwise(df.qty*df.price_per_qty)
df = df.groupBy(col('symbol')).sum(col('total_price').alias('profit_loss'))
请尝试理解我的概念。由于我现在没有笔记本电脑,可能会有代码语法错误。
谢谢
如果你想使用 Hive 来解决这个问题,你可以使用下面的查询来获得所需的结果:
select date
,symbol
,sum(qty * (CASE
WHEN type='buy' THEN price_per_qty
ELSE (price_per_qty*(-1))
END
)
) over (partition by symbol order by date rows between unbounded preceding and current row) as total_profit
,row_number() over( partition by symbol order by date desc ) as rn
from stock1) as running_sum where rn=1;
@Chris Maurer 你能给出上述问题的sql查询吗?