我发现
dataframe.agg(avg(Col)
工作正常,但是当我在整个列的窗口上计算 avg() 时(不使用任何分区),我会根据与 orderBy 一起使用的列看到不同的结果。
示例代码:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sample_for_SE").getOrCreate()
# Sample data
data = [
(1, 10.0, 5.0),
(3, 20.0, None),
(5, 15.0, None)
]
schema = ["id", "value1", "value2"]
df = spark.createDataFrame(data, schema=schema)
# Display DataFrame and avg()
df.show()
df.agg(avg("value1")).show()
输出正确显示 DF 和 avg:
+---+------+------+
| id|value1|value2|
+---+------+------+
| 1| 10.0| 5.0|
| 3| 20.0| NULL|
| 5| 15.0| NULL|
+---+------+------+
+-----------+
|avg(value1)|
+-----------+
| 15.0|
+-----------+
但是使用窗口函数:
from pyspark.sql.window import Window
#with orderBy("value1")
#========================
w = Window.orderBy("value1")
df.withColumn("AVG",avg(col("value1")).over(w))\
.sort("id",ascending=True)\
.show()
#with orderBy("id")
#========================
w = Window.orderBy("id")
df.withColumn("AVG",avg(col("value1")).over(w))\
.sort("id",ascending=True)\
.show()
输出:
| id|value1|value2| AVG|
+---+------+------+----+
| 1| 10.0| 5.0|10.0|
| 3| 20.0| NULL|15.0|
| 5| 15.0| NULL|12.5|
+---+------+------+----+
+---+------+------+----+
| id|value1|value2| AVG|
+---+------+------+----+
| 1| 10.0| 5.0|10.0|
| 3| 20.0| NULL|15.0|
| 5| 15.0| NULL|15.0|
+---+------+------+----+
问题:
avg()
?这也是我在实验中得到的,也是经过相当长的一段时间才明白的。我后来找到了一个参考链接,其中解释了这个问题,但我再也找不到了。
无论如何,发生这种情况的原因如下:每当我们在 Spark 中使用
Window
函数并决定使用 .orderBy()
对其进行排序时,都会有一个可选参数 .rangeBetween
,默认情况下隐藏设置为 (Window.unboundedPreceding, Window.currentRow)
,这意味着每行的移动平均值是通过对当前行和数据帧的第一行之间的所有值进行平均而得出的。
示例:
要克服这个“问题/功能”,您只需在窗口的定义中指定您想要的 WindowSpec,即:
w = Window.orderBy("value1").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)