我需要将以下代码转换为 Pyspark。
我知道如何在 Pyspark 中创建数据框
df_stack_exchange
,但不知道如何在 Pyspark 中创建等效的 assign_boxes
函数。任何帮助将不胜感激
data_stack_exchange = {'store': ['A','B', 'B', 'C', 'C', 'C', 'D', 'D', 'D', 'D'],
'worker': [1,1,2,1,2,3,1,2,3,4],
'boxes': [105, 90, 100, 80, 10, 200, 70, 210, 50, 0],
'optimal_boxes': [0,0,0,0,0,0,0,0,0,0]}
df_stack_exchange = pandas.DataFrame(data_stack_exchange)
def assign_boxes(s):
total = s.sum()
d = min(total // 100, len(s)-1)
return [100]*d+[total - 100*d]+[0]*(len(s)-d-1)
df['optimal_boxes'] = df.groupby('store')['boxes'].transform(assign_boxes)
我已经通过官方 Pyspark 文档和 50 多个 stackexchange 线程阅读了 UDF,但无法弄清楚
下面的方法有效,但我没有使用函数:
w0=Window.partitionBy("store")
sp_df=sp_df.withColumn("row",row_number().over(w0.orderBy(monotonically_increasing_id())))
sp_df=sp_df.withColumn("optimal_boxes",lit("100"))
sp_df=sp_df.withColumn("sum_boxes",sum(col("boxes")).over(w0))
sp_df=sp_df.withColumn("optimal_boxes",when(col("row")==max(col("row")).over(w0),\
col("sum_boxes")-lag(sum(col("optimal_boxes")).over(w0.orderBy("row"))).over(w0.orderBy("row"))).otherwise(col("optimal_boxes")))
sp_df=sp_df.withColumn("optimal_boxes",when(col("optimal_boxes").isNull(),col("boxes")).otherwise(col("optimal_boxes")))
sp_df=sp_df.drop("sum_boxes","row")
sp_df.show()