如何在PySpark中的rowsBetween中使用unboundedPreceding、unboundedFollowing和currentRow

问题描述 投票:0回答:1

我对接受 pyspark.sql.Window.rowsBetween

Window.unboundedPreceding
Window.unboundedFollowing
对象作为
Window.currentRow
start
参数的方法
end
 有点困惑。您能否通过一些示例解释一下该函数的工作原理以及如何正确使用 
Window
对象?谢谢!

python pyspark group-by
1个回答
16
投票

顾名思义,行之间/范围之间有助于限制窗口内考虑的行数。

让我们举一个简单的例子。

从数据开始:

dfw=spark.createDataFrame([("abc",1,100),("abc",2,200),("abc",3,300),("abc",4,200),("abc",5,100)],"name string,id int,price int")

# output
+----+---+-----+
|name| id|price|
+----+---+-----+
| abc|  1|  100|
| abc|  2|  200|
| abc|  3|  300|
| abc|  4|  200|
| abc|  5|  100|
+----+---+-----+

现在通过这些数据,让我们尝试找到运行最大值,即每行的最大值:

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id"))).show()

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|100|
| abc|  2|  200|200|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+

因此,正如预期的那样,它从上到下逐一查看每个价格,并填充它得到的最大值,这种行为被称为

start = Window.unboundedPreceding
end = Window.currentRow

现在将值之间的行更改为

start = Window.unboundedPreceding
end = Window.unboundedFollowing
,我们将得到如下结果:

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing))).show()

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|300|
+----+---+-----+---+

现在,正如您在同一窗口中看到的那样,它向下查找所有值的最大值,而不是将其限制为当前行。

现在第三个将是

start = Window.currentRow
end  = Window.unboundedFollowing

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id").rowsBetween(Window.currentRow,Window.unboundedFollowing))).show()

#output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|300|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|200|
| abc|  5|  100|100|
+----+---+-----+---+

现在它只向下查找从当前行开始的最大值。

此外,它不限于按原样使用这 3 个,您甚至可以使用

start = Window.currentRow-1
end = Window.currentRow+1
,因此不必查找上方或下方的所有值,它只会查看上方 1 行和下方 1 行。

像这样:

dfw.withColumn("rm",F.max("price").over(Window.partitionBy("name").orderBy("id").rowsBetween(Window.currentRow-1,Window.currentRow+1))).show()

# output
+----+---+-----+---+
|name| id|price| rm|
+----+---+-----+---+
| abc|  1|  100|200|
| abc|  2|  200|300|
| abc|  3|  300|300|
| abc|  4|  200|300|
| abc|  5|  100|200|
+----+---+-----+---+

所以你可以想象它是窗口内的一个窗口,围绕它正在处理的当前行工作。

© www.soinside.com 2019 - 2024. All rights reserved.