我有一个输入表,其中包含 orderid 、 range 和 value1 、 value2 和 value3 。 我想根据 orderid 和范围创建 3 个新列 previousvalue1、previousvalue2、previousvalue3。这是示例输入和预期输出。
输入- 输入表
输出- 输出表
我尝试使用 .lag 函数和窗口函数 - 按 orderid、range orderby 范围分区,但它从同一范围内的第一行 avobe 中选取最后一个值,而不是前一个范围。
将分区视为独立的实体。分区之间没有值传递(这正是您想要的输出)。
您需要根据
lag
列值应用条件和 range
函数,并将 orderid
保留为分区列,如下所示以获得正确的输出:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.getOrCreate()
data = [
(1, 100, 10),
(1, 100, 10),
(1, 150, 20),
(1, 300, 40),
(1, 200, 22),
(1, 200, 22),
(1, 400, 60),
(2, 150, 10),
(2, 150, 10),
(2, 250, 15),
(2, 300, 20),
(2, 300, 20)
]
df = spark.createDataFrame(data, ["orderid", "range", "value1"])
windowSpec = Window.partitionBy("orderid").orderBy("range")
df = df.withColumn(
"temp",
F.when(
F.col("range") != F.lag("range").over(windowSpec),
F.lag("value1").over(windowSpec)
).otherwise(None))
df = df.withColumn(
"previousvalue1",
F.last("temp", ignorenulls=True).over(windowSpec)
).drop("temp")
df.show()
输出:
+-------+-----+------+--------------+
|orderid|range|value1|previousvalue1|
+-------+-----+------+--------------+
| 1| 100| 10| NULL|
| 1| 100| 10| NULL|
| 1| 150| 20| 10|
| 1| 200| 22| 20|
| 1| 200| 22| 20|
| 1| 300| 40| 22|
| 1| 400| 60| 40|
| 2| 150| 10| NULL|
| 2| 150| 10| NULL|
| 2| 250| 15| 10|
| 2| 300| 20| 15|
| 2| 300| 20| 15|
+-------+-----+------+--------------+
我正在根据滞后值是否与
temp
中的当前值相同来创建一个 range
列。然后,我使用每个分区的 temp
中的最后一个非空值填充该列。
您可以对其他两列重复相同的操作。