我有一个如下所示的 pyspark 数据框:
HierarchyNode ParentNode
USREBT2.0.1 USREBT2
USREBT2.1.1 USREBT2.0.1
1004052024.0.1 1004052024
1004052024.1.1 1004052024.0.1
1004052024.1.1.1 1004052024.1.1
这更像是一个层次结构表。我想把它弄平,预期的输出应该是:
ParentWBSElementExternalID LN-1 LN-2 LN-3
USREBT2 NULL NULL NULL
USREBT2 USREBT2.0.1 NULL NULL
USREBT2 USREBT2.0.1 USREBT2.1.1 NULL
1004052024 NULL NULL NULL
1004052024 1004052024.0.1 NULL NULL
1004052024 1004052024.0.1 1004052024.1.1 NULL
1004052024 1004052024.0.1 1004052024.1.1 1004052024.1.1.1
使用
recursive CTE
是一种解决方案,但不幸的是在数据块中我们无法使用递归 CTE。另外,正如您所看到的,HierarchyNode 的级别(即子节点)可以达到 2 级、3 级等。因此它是动态的。
我已经关注了这个SO链接。但没有得到任何正确的答案。
任何使用 PySpark 的帮助将不胜感激。
是的,您可以使用递归 CTE 来完成此操作。您也正确地认为没有 databricks SQL 递归 CTE。但是,正如您在this精彩文章中看到的那样,有一个 pyspark 等效项。
我将以此为基础来解决您的问题,并将使用 pyspark 和 SQL。
首先将数据框存储到临时视图中。
df.createOrReplaceTempView('test1')
现在参考上面提到的文章来介绍递归CTE。
i = 1
df = spark.sql("""
SELECT ParentNode,
HierarchyNode,
1 AS level
FROM test1
""")
# this view is our 'CTE' that we reference with each pass
df.createOrReplaceTempView('recursion_df')
print("entering loop")
while True:
# select data for this recursion level
new_df = spark.sql("""
SELECT
r.ParentNode,
t.HierarchyNode,
(level + 1) AS level
FROM test1 t
INNER JOIN recursion_df r
ON r.HierarchyNode = t.ParentNode
""")
# this view is our 'CTE' that we reference with each pass
new_df.createOrReplaceTempView('recursion_df')
# add the results to the main output dataframe
df = df.union(new_df)
# if there are no results at this recursion level then break
print(f"{i} : {new_df.count()}")
if (new_df.count() == 0 or i == 5) :
df.createOrReplaceTempView("final_df")
break
else:
i += 1
它还有一个名为 level 的附加列,它是递归级别。
现在创建结果的临时视图以获取根父节点。
%sql
CREATE OR REPLACE TEMPORARY VIEW TEST1 AS
SELECT ParentNode FROM final_df WHERE level = 1 and ParentNode NOT IN (SELECT HierarchyNode FROM final_df);
既然你已经知道级别不能大于3,你可以像这样创建一个 union 语句
%sql
SELECT c.ParentNode, r.HierarchyNode AS LN_1, NULL as LN_2, NULL as LN_3 FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 0
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, NULL as LN_2, NULL as LN_3 FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, s.HierarchyNode as LN_2, NULL as LN_3 FROM TEST1 c
LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
LEFT JOIN final_df s ON s.ParentNode = c.ParentNode AND s.level = 2
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, s.HierarchyNode as LN_2, t.HierarchyNode as LN_3 FROM TEST1 c
LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
LEFT JOIN final_df s ON s.ParentNode = c.ParentNode AND s.level = 2
LEFT JOIN final_df t ON t.ParentNode = c.ParentNode AND t.level = 3
;
你的结果是
更新*
上面的答案假设递归级别是3。但是,假设我们不知道级别是多少。 首先执行pyspark的递归CTE函数。您需要将递归等级限制在某个最大值。它实际上可以是任何东西(比如 100),因为当没有更多元素或达到最大递归时循环将中断。
recursion_lvl = 5
i = 1
df = spark.sql("""
SELECT ParentNode,
HierarchyNode,
1 AS level
FROM test1
""")
# this view is our 'CTE' that we reference with each pass
df.createOrReplaceTempView('recursion_df')
print("entering loop")
while True:
# select data for this recursion level
new_df = spark.sql("""
SELECT
r.ParentNode,
t.HierarchyNode,
(level + 1) AS level
FROM test1 t
INNER JOIN recursion_df r
ON r.HierarchyNode = t.ParentNode
""")
# this view is our 'CTE' that we reference with each pass
new_df.createOrReplaceTempView('recursion_df')
# add the results to the main output dataframe
df = df.union(new_df)
# if there are no results at this recursion level then break
print(f"{i} : {new_df.count()}")
if (new_df.count() == 0 or i == recursion_lvl) :
df.createOrReplaceTempView("final_df")
break
else:
i += 1
现在获取唯一的基根(如上所示)
%sql
CREATE OR REPLACE TEMPORARY VIEW TEST1 AS
SELECT ParentNode FROM final_df WHERE level = 1 and ParentNode NOT IN (SELECT HierarchyNode FROM final_df);
SELECT * FROM TEST1;
现在获取数据中的最大递归级别
lvldf = spark.sql("SELECT MAX(level) AS MAXLVL from final_df;")
lvlmax = lvldf.head().asDict()['MAXLVL']
print(lvlmax)
最后是执行循环的复杂代码。 - 创建一个空的决赛桌。 -插入具有基本父根的行和剩余列的空值 -循环并创建插入语句并在最终表上执行它们。
unionsql = "SELECT c.ParentNode,"
col = ""
for i in range(lvlmax):
col = col + f"NULL AS LN_{i+1}"
if (i+1) < lvlmax:
col = col + ','
unionsql = "SELECT c.ParentNode," + col + ' FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 0'
print(unionsql)
createsql = "CREATE OR REPLACE TABLE final_table (HierarchyNode STRING, "
for i in range(lvlmax):
createsql = createsql + f"LN_{i+1} STRING"
if (i+1) < lvlmax:
createsql = createsql + ", "
createsql = createsql + ");"
print(createsql)
spark.sql(createsql) # Execute the create table statement
spark.sql("INSERT INTO final_table " + unionsql) # INSERT THE ROOT parent with nulls for remaining cols
#Check the max
lvldf = spark.sql("SELECT MAX(level) AS MAXLVL from final_df;")
lvlmax = lvldf.head().asDict()['MAXLVL']
print(lvlmax)
sql = "INSERT INTO final_table SELECT c.ParentNode, "
jointmp = ""
for i in range(lvlmax):
for j in range(i+1):
sql = sql + f" a{j+1}.HierarchyNode AS LN_{j+1}"
if (j+1) < (lvlmax):
sql = sql + ','
for k in range(i+1, lvlmax2):
sql = sql + f" NULL as LN_{k+1}"
if (k+1) < (lvlmax):
sql = sql + ','
jointmp = jointmp + f" LEFT JOIN final_df a{i+1} ON a{i+1}.ParentNode = c.ParentNode AND a{i+1}.level = {i+1}"
sql = sql + f" FROM TEST1 c " + jointmp
print(sql)
spark.sql(sql)
sql = "INSERT INTO final_table SELECT c.ParentNode, "
最终表格将包含重复项。因此,要获得您需要的内容,只需像这样选择不同的即可。
%sql
Select DISTINCT * FROM final_table;