Pyspark:动态扁平化层次结构表

问题描述 投票:0回答:1

我有一个如下所示的 pyspark 数据框:

HierarchyNode       ParentNode
 USREBT2.0.1        USREBT2
 USREBT2.1.1        USREBT2.0.1
 1004052024.0.1     1004052024
 1004052024.1.1     1004052024.0.1
 1004052024.1.1.1   1004052024.1.1

这更像是一个层次结构表。我想把它弄平,预期的输出应该是:

ParentWBSElementExternalID  LN-1            LN-2              LN-3
 USREBT2                    NULL            NULL              NULL
 USREBT2                    USREBT2.0.1     NULL              NULL
 USREBT2                    USREBT2.0.1     USREBT2.1.1       NULL
 1004052024                 NULL            NULL              NULL
 1004052024                 1004052024.0.1  NULL              NULL
 1004052024                 1004052024.0.1  1004052024.1.1    NULL
 1004052024                 1004052024.0.1  1004052024.1.1    1004052024.1.1.1

使用

recursive CTE
是一种解决方案,但不幸的是在数据块中我们无法使用递归 CTE。另外,正如您所看到的,HierarchyNode 的级别(即子节点)可以达到 2 级、3 级等。因此它是动态的。

我已经关注了这个SO链接。但没有得到任何正确的答案。

任何使用 PySpark 的帮助将不胜感激。

pyspark databricks
1个回答
2
投票

是的,您可以使用递归 CTE 来完成此操作。您也正确地认为没有 databricks SQL 递归 CTE。但是,正如您在this精彩文章中看到的那样,有一个 pyspark 等效项。

我将以此为基础来解决您的问题,并将使用 pyspark 和 SQL。

首先将数据框存储到临时视图中。

df.createOrReplaceTempView('test1')

现在参考上面提到的文章来介绍递归CTE。

i = 1
df = spark.sql("""
        SELECT ParentNode,
        HierarchyNode,
        1 AS level
        FROM test1
        """)
# this view is our 'CTE' that we reference with each pass
df.createOrReplaceTempView('recursion_df')

print("entering loop")
while True:
    # select data for this recursion level
    new_df = spark.sql("""
        SELECT
            r.ParentNode,
            t.HierarchyNode,
            (level + 1) AS level
            FROM test1 t
            INNER JOIN recursion_df r
            ON r.HierarchyNode = t.ParentNode
        """)

    # this view is our 'CTE' that we reference with each pass
    new_df.createOrReplaceTempView('recursion_df')
    # add the results to the main output dataframe
    df = df.union(new_df)
    # if there are no results at this recursion level then break
    print(f"{i} : {new_df.count()}")
    if (new_df.count() == 0 or i == 5) :
        df.createOrReplaceTempView("final_df")
        break
    else:
        i += 1

但是可以检查视图final_df来看看它包含什么。 enter image description here

它还有一个名为 level 的附加列,它是递归级别。

现在创建结果的临时视图以获取根父节点。

%sql
CREATE OR REPLACE TEMPORARY VIEW TEST1 AS
SELECT ParentNode FROM final_df WHERE level = 1 and ParentNode NOT IN (SELECT HierarchyNode FROM final_df);

enter image description here

既然你已经知道级别不能大于3,你可以像这样创建一个 union 语句

%sql
SELECT c.ParentNode, r.HierarchyNode AS LN_1, NULL as LN_2, NULL as LN_3 FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 0
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, NULL as LN_2, NULL as LN_3 FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, s.HierarchyNode as LN_2, NULL as LN_3 FROM TEST1 c 
LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
LEFT JOIN final_df s ON s.ParentNode = c.ParentNode AND s.level = 2 
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, s.HierarchyNode as LN_2, t.HierarchyNode as LN_3 FROM TEST1 c 
LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
LEFT JOIN final_df s ON s.ParentNode = c.ParentNode AND s.level = 2
LEFT JOIN final_df t ON t.ParentNode = c.ParentNode AND t.level = 3 
; 

你的结果是

enter image description here

更新*

上面的答案假设递归级别是3。但是,假设我们不知道级别是多少。 首先执行pyspark的递归CTE函数。您需要将递归等级限制在某个最大值。它实际上可以是任何东西(比如 100),因为当没有更多元素或达到最大递归时循环将中断。

recursion_lvl = 5
i = 1
df = spark.sql("""
        SELECT ParentNode,
        HierarchyNode,
        1 AS level
        FROM test1
        """)
# this view is our 'CTE' that we reference with each pass
df.createOrReplaceTempView('recursion_df')

print("entering loop")
while True:
    # select data for this recursion level
    new_df = spark.sql("""
        SELECT
            r.ParentNode,
            t.HierarchyNode,
            (level + 1) AS level
            FROM test1 t
            INNER JOIN recursion_df r
            ON r.HierarchyNode = t.ParentNode
        """)

    # this view is our 'CTE' that we reference with each pass
    new_df.createOrReplaceTempView('recursion_df')
    # add the results to the main output dataframe
    df = df.union(new_df)
    # if there are no results at this recursion level then break
    print(f"{i} : {new_df.count()}")
    if (new_df.count() == 0 or i == recursion_lvl) :
        df.createOrReplaceTempView("final_df")
        break
    else:
        i += 1

现在获取唯一的基根(如上所示)

%sql
CREATE OR REPLACE TEMPORARY VIEW TEST1 AS
SELECT ParentNode FROM final_df WHERE level = 1 and ParentNode NOT IN (SELECT HierarchyNode FROM final_df);

SELECT * FROM TEST1;

现在获取数据中的最大递归级别

lvldf = spark.sql("SELECT MAX(level) AS MAXLVL from final_df;")
lvlmax = lvldf.head().asDict()['MAXLVL']
print(lvlmax)

最后是执行循环的复杂代码。 - 创建一个空的决赛桌。 -插入具有基本父根的行和剩余列的空值 -循环并创建插入语句并在最终表上执行它们。

   unionsql = "SELECT c.ParentNode,"
    col = ""
    for i in range(lvlmax):
        col = col + f"NULL AS LN_{i+1}"
        if (i+1) < lvlmax:
            col = col + ','
    unionsql = "SELECT c.ParentNode," + col + ' FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 0'
    
    print(unionsql)
    
    createsql = "CREATE OR REPLACE TABLE final_table (HierarchyNode STRING, "
    for i in range(lvlmax):
        createsql = createsql + f"LN_{i+1} STRING"
        if (i+1) < lvlmax:
            createsql = createsql + ", "
    createsql = createsql + ");"
    
    print(createsql)
    spark.sql(createsql)   # Execute the create table statement
    spark.sql("INSERT INTO final_table " + unionsql)  # INSERT THE ROOT parent with nulls for remaining cols
    
    #Check the max
    lvldf = spark.sql("SELECT MAX(level) AS MAXLVL from final_df;")
    lvlmax = lvldf.head().asDict()['MAXLVL']
    print(lvlmax)
    
    sql = "INSERT INTO final_table SELECT c.ParentNode, "
    jointmp = ""
    for i in range(lvlmax):
        for j in range(i+1):
            sql = sql + f" a{j+1}.HierarchyNode AS LN_{j+1}"
            if (j+1) < (lvlmax):
                sql = sql + ','
        for k in range(i+1, lvlmax2):
            sql = sql + f" NULL as LN_{k+1}"
            if (k+1) < (lvlmax):
                sql = sql + ','
        jointmp = jointmp + f" LEFT JOIN final_df a{i+1} ON a{i+1}.ParentNode = c.ParentNode AND a{i+1}.level = {i+1}"
        sql = sql + f" FROM TEST1 c " + jointmp
        print(sql)
        spark.sql(sql)
        sql = "INSERT INTO final_table SELECT c.ParentNode, "

最终表格将包含重复项。因此,要获得您需要的内容,只需像这样选择不同的即可。

%sql
Select DISTINCT * FROM final_table;
© www.soinside.com 2019 - 2024. All rights reserved.