如何使用 PySpark 对这些相关行进行分组?

问题描述 投票:0回答:1

我需要在 Databricks 上的 Python PySpark 中解析一些文本数据。数据如下:

df = spark.createDataFrame([("new entry", 1, 123), 
    ("acct", 2, None), 
    ("cust ID", 3, None),
    ("new entry", 4, 456),
    ("acct", 5, None),
    ("more text", 6, None),
    ("cust ID", 7, None)], 
    ("value", "line num", "tracking ID"))

这里我手动添加了“需要分组”列来说明 - 从“新条目”到“客户 ID”的行是一组,后面是另一组。它们的长度并不相同。

enter image description here

我需要在几行之前将客户 ID 与跟踪 ID 进行匹配,如下所示:

enter image description here

如何将客户 ID 与跟踪 ID 匹配?我想到了一个窗口函数,但我不确定如何创建所需的分组。

python pyspark databricks
1个回答
0
投票

要解决您的问题,请使用

window.orderBy
函数和
last()
来填充前向值。

代码:

from pyspark.sql.window import Window
from pyspark.sql.functions import col, last

df = spark.createDataFrame([
("new entry", 1, 123),
("acct", 2, None),
("cust ID", 3, None),
("new entry", 4, 456),
("acct", 5, None),
("more text", 6, None),
("cust ID", 7, None)
], ["value", "line num", "tracking ID"])

# use window function
window_fun = Window.orderBy("line num")
df_filled = df.withColumn("tracking_ID_fil", last("tracking ID", True).over(window_fun))
display(df_filled)

# Use Filter
res1 = df_filled.filter(col("value") == "cust ID").select("value", "tracking_ID_fil")

# Rename column values
dff1 = res1.withColumnRenamed("tracking_ID_fil", "tracking ID")
display(dff1)

输出: enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.