在 Spark 中明智地获取上一个值分区

问题描述 投票:0回答:1

我有一个输入表,其中包含 orderid 、 range 和 value1 、 value2 和 value3 。 我想根据 orderid 和范围创建 3 个新列 previousvalue1、previousvalue2、previousvalue3。这是示例输入和预期输出。

输入- 输入表

输出- 输出表

我尝试使用 .lag 函数和窗口函数 - 按 orderid、range orderby 范围分区,但它从同一范围内的第一行 avobe 中选取最后一个值,而不是前一个范围。

sql apache-spark hadoop pyspark bigdata
1个回答
0
投票

将分区视为独立的实体。分区之间没有值传递(这正是您想要的输出)。

您需要根据

lag
列值应用条件和
range
函数,并将
orderid
保留为分区列,如下所示以获得正确的输出:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

spark = SparkSession.builder.getOrCreate()

data = [
    (1, 100, 10),
    (1, 100, 10),
    (1, 150, 20),
    (1, 300, 40),
    (1, 200, 22),
    (1, 200, 22),
    (1, 400, 60),
    (2, 150, 10),
    (2, 150, 10),
    (2, 250, 15),
    (2, 300, 20),
    (2, 300, 20)
]

df = spark.createDataFrame(data, ["orderid", "range", "value1"])

windowSpec = Window.partitionBy("orderid").orderBy("range")

df = df.withColumn(
    "temp", 
    F.when(
        F.col("range") != F.lag("range").over(windowSpec),
        F.lag("value1").over(windowSpec)
    ).otherwise(None))

df = df.withColumn(
    "previousvalue1", 
    F.last("temp", ignorenulls=True).over(windowSpec)
).drop("temp")

df.show()

输出:

+-------+-----+------+--------------+
|orderid|range|value1|previousvalue1|
+-------+-----+------+--------------+
|      1|  100|    10|          NULL|
|      1|  100|    10|          NULL|
|      1|  150|    20|            10|
|      1|  200|    22|            20|
|      1|  200|    22|            20|
|      1|  300|    40|            22|
|      1|  400|    60|            40|
|      2|  150|    10|          NULL|
|      2|  150|    10|          NULL|
|      2|  250|    15|            10|
|      2|  300|    20|            15|
|      2|  300|    20|            15|
+-------+-----+------+--------------+

我正在根据滞后值是否与

temp
中的当前值相同来创建一个
range
列。然后,我使用每个分区的
temp
中的最后一个非空值填充该列。

您可以对其他两列重复相同的操作。

© www.soinside.com 2019 - 2024. All rights reserved.