如何使用window.partionBy()为Spark数据帧创建row_index?

问题描述 投票:0回答:2

我有一个带有单列的数据框,如下所示。

Type
'BAT'
'BAT'
'BALL'
'BAT'
'BALL'
'BALL'

在上面的数据框中,我添加了一个名为“const”的新列。

df = df.withColumn('const',F.lit(1))

如何在“const”列上使用 window.partionBy() 执行 cumsum 并创建新的 row_id 列?

预期输出

Type  row_id
'BAT'   1
'BAT'   2
'BALL'  3
'BAT'   4
'BALL'  5
'BALL'  6

我也不想使用 RDD,由于性能原因,一切都应该在 Dataframe 中。

编辑

  • 我希望行ID增加+1
  • 由于上述原因不想使用 monotonically_increasing() 函数
apache-spark pyspark apache-spark-sql
2个回答
2
投票

如果您只想要行索引而不考虑值,则使用:

df = df.withColumn('row_id',F.monotonically_increasing_id())

这将为每行创建一个 unic 索引。

如果您想考虑您的值,并且对重复值具有相同的索引,则使用排名:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("type")
df = df.withColumn('row_id',F.rank().over(w))

您当然可以使用 sum 或 row_number 实现相同的效果,但我认为上面的 2 种方法更好。

import sys
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy().rowsBetween(-sys.maxsize,0)
df = df.withColumn('row_id',F.sum("const").over(w))

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy("const")
df = df.withColumn('row_id',F.row_number().over(w))

0
投票

如果您将窗口定义为按所有行中具有相同值的列进行排序,则直接使用 row_number() 函数可能会更改原始行顺序,在这种情况下,我将首先使用 monotically_increasing_id() 创建一个新列“row_order”保持原始行顺序(因为它会给你一个单调递增的数字)。然后使用 "row_number()" 函数并设置窗口按生成的列进行排序 "row_order",这是一个示例:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

df = df.withColumn('row_order',F.monotonically_increasing_id())
w = Window().partitionBy().orderBy("row_order")
df = df.withColumn('row_id',F.row_number().over(w))
df = df.drop("row_order")

这将确保您在应用窗口后在表中保留原始行顺序。

© www.soinside.com 2019 - 2024. All rights reserved.