所以我已经阅读了这个综合材料,但我不明白为什么窗口函数会这样。
这是一个小例子:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
columns = ["CATEGORY", "REVENUE"]
data = [("Cell Phone", "6000"),
("Tablet", "1500"),
("Tablet", "5500"),
("Cell Phone", "5000"),
("Cell Phone", "6000"),
("Tablet", "2500"),
("Cell Phone", "3000"),
("Cell Phone", "3000"),
("Tablet", "3000"),
("Tablet", "4500"),
("Tablet", "6500")]
df = spark.createDataFrame(data=data, schema=columns)
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE'])
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
所以当我写
orderBy(df['REVENUE'])
时,我得到这个:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 3000| 3000|
|Cell Phone| 3000| 3000|
|Cell Phone| 5000| 5000|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
| Tablet| 1500| 1500|
| Tablet| 2500| 2500|
| Tablet| 3000| 3000|
| Tablet| 4500| 4500|
| Tablet| 5500| 5500|
| Tablet| 6500| 6500|
+----------+-------+------------------+
但是当我写
orderBy(df['REVENUE']).desc()
时,我得到了这个:
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
+----------+-------+------------------+
我不明白,因为在我看来,无论顺序如何,每个窗口中的最大值都保持不变。那么有人可以解释一下我在这里没有得到什么吗?
谢谢!
原因很简单,默认的窗口范围/行规格是
Window.UnboundedPreceding
到Window.CurrentRow
,这意味着最大值是从该分区的第一行到当前行,而不是分区的最后一行。
这是一个常见的问题。 (您可以将
.max()
替换为 sum()
并查看得到的输出。它也会根据您对分区的排序方式而变化。)
要解决此问题,您可以指定始终使用全窗口分区来计算每个分区的最大值,如下所示:
window_spec = Window.partitionBy(df['CATEGORY']).orderBy(df['REVENUE']).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
+----------+-------+------------------+
这是我刚刚遇到的一个有趣的行为。 我的理解是,当我们将
orderBy
添加到 Windows 定义中时,Spark 将在每个分区内维护一个逻辑指针。当指针移动穿过每一行时,考虑到定义的帧,基于有序行直到该点计算窗口函数。这对于对分区内的行进行排名或编号等情况非常方便。
另一方面,当窗口是无序窗口时,Spark会立即考虑整个分区上的窗口函数。分区内的行顺序没有明确定义,因此窗口函数将将分区中的所有行作为单个帧进行操作。对于像
max
这样我们不需要任何特殊排序的聚合,将适合该组。
最后在你的例子中
/* Window definition without orderBy */
window_spec = Window.partitionBy(df['CATEGORY'])
revenue_difference = F.max(df['REVENUE']).over(window_spec)
df.select(
df['CATEGORY'],
df['REVENUE'],
revenue_difference.alias("revenue_difference")).show()
+----------+-------+------------------+
| CATEGORY|REVENUE|revenue_difference|
+----------+-------+------------------+
|Cell Phone| 6000| 6000|
|Cell Phone| 6000| 6000|
|Cell Phone| 5000| 6000|
|Cell Phone| 3000| 6000|
|Cell Phone| 3000| 6000|
| Tablet| 6500| 6500|
| Tablet| 5500| 6500|
| Tablet| 4500| 6500|
| Tablet| 3000| 6500|
| Tablet| 2500| 6500|
| Tablet| 1500| 6500|
+----------+-------+------------------+