我有一个带有单列的数据框,如下所示。
Type
'BAT'
'BAT'
'BALL'
'BAT'
'BALL'
'BALL'
在上面的数据框中,我添加了一个名为“const”的新列。
df = df.withColumn('const',F.lit(1))
如何在“const”列上使用 window.partionBy() 执行 cumsum 并创建新的 row_id 列?
预期输出
Type row_id
'BAT' 1
'BAT' 2
'BALL' 3
'BAT' 4
'BALL' 5
'BALL' 6
我也不想使用 RDD,由于性能原因,一切都应该在 Dataframe 中。
编辑
如果您只想要行索引而不考虑值,则使用:
df = df.withColumn('row_id',F.monotonically_increasing_id())
这将为每行创建一个 unic 索引。
如果您想考虑您的值,并且对重复值具有相同的索引,则使用排名:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("type")
df = df.withColumn('row_id',F.rank().over(w))
您当然可以使用 sum 或 row_number 实现相同的效果,但我认为上面的 2 种方法更好。
import sys
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy().rowsBetween(-sys.maxsize,0)
df = df.withColumn('row_id',F.sum("const").over(w))
或
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("const")
df = df.withColumn('row_id',F.row_number().over(w))
如果您将窗口定义为按所有行中具有相同值的列进行排序,则直接使用 row_number() 函数可能会更改原始行顺序,在这种情况下,我将首先使用 monotically_increasing_id() 创建一个新列“row_order”保持原始行顺序(因为它会给你一个单调递增的数字)。然后使用 "row_number()" 函数并设置窗口按生成的列进行排序 "row_order",这是一个示例:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = df.withColumn('row_order',F.monotonically_increasing_id())
w = Window().partitionBy().orderBy("row_order")
df = df.withColumn('row_id',F.row_number().over(w))
df = df.drop("row_order")
这将确保您在应用窗口后在表中保留原始行顺序。