我有一个大型网络元素数据集(认为:大数据),这些网络元素形成了树状网络。
玩具数据集如下所示:
| id | type | parent_id |
|-----:|:-------|:------------|
| 1 | D | <NA> |
| 2 | C | 1 |
| 3 | C | 2 |
| 4 | C | 3 |
| 5 | B | 3 |
| 6 | B | 4 |
| 7 | A | 4 |
| 8 | A | 5 |
| 9 | A | 3 |
重要规则:
我想要实现的是构建一种有效的方法(最好在 Spark 中)来计算每个节点的树路径,如下所示:
| id | type | parent_id | path |
|-----:|:-------|:------------|:--------------------|
| 1 | D | <NA> | D:1 |
| 2 | C | 1 | D:1>C:2 |
| 3 | C | 2 | D:1>C:2>C:3 |
| 4 | C | 3 | D:1>C:2>C:3>C:4 |
| 5 | B | 3 | D:1>C:2>C:3>B:5 |
| 6 | B | 4 | D:1>C:2>C:3>C:4>B:6 |
| 7 | A | 4 | D:1>C:2>C:3>C:4>A:7 |
| 8 | A | 5 | D:1>C:2>C:3>B:5>A:8 |
| 9 | A | 3 | D:1>C:2>C:3>A:9 |
注意:
树路径中的每个元素都是这样构造的:
id:type
。
如果您有其他有效的方法来存储树路径(例如,闭包表)并计算它们,我也很高兴听到它们。然而,计算的运行时间必须非常短(小于一个小时,最好是几分钟),并且稍后的检索需要在几秒钟的范围内。
最终目标是拥有一个数据结构,使我能够有效地聚合某个节点下的任何网络节点(最多运行几秒钟)。
由大约 3M 个节点组成的实际数据集可以这样构建:
注意:
import random
import pandas as pd
random.seed(1337)
node_counts = {'A': 1424383, 'B': 596994, 'C': 234745, 'D': 230937, 'E': 210663, 'F': 122859, 'G': 119453, 'H': 57462, 'I': 23260, 'J': 15008, 'K': 10666, 'L': 6943, 'M': 6724, 'N': 2371, 'O': 2005, 'P': 385}
#node_counts = {'A': 3, 'B': 2, 'C': 3, 'D': 1}
elements = list()
candidates = list()
root_type = list(node_counts.keys())[-1]
leaf_type = list(node_counts.keys())[0]
root_counts = node_counts[root_type]
leaves_count = node_counts[leaf_type]
ids = [i + 1 for i in range(sum(node_counts.values()))]
idcounter = 0
for i, (name, count) in enumerate(sorted(node_counts.items(), reverse=True)):
for _ in range(count):
_id = ids[idcounter]
idcounter += 1
_type = name
if i == 0:
_parent = None
else:
# select a random one that is not a root or a leaf
if len(candidates) == 0: # first bootstrap case
candidate = random.choice(elements)
else:
candidate = random.choice(candidates)
_parent = candidate['id']
_obj = {'id': _id, 'type': _type, 'parent_id': _parent}
#print(_obj)
elements.append(_obj)
if _type != root_type and _type != leaf_type:
candidates.append(_obj)
df = pd.DataFrame.from_dict(elements).astype({'parent_id': 'Int64'})
为了使用上述玩具数据在纯Python中生成树路径,您可以使用以下函数:
def get_hierarchy_path(df, cache_dict, ID='id', LABEL = 'type', PARENT_ID = 'parent_id', node_sep='|', elem_sep=':'):
def get_path(record):
if pd.isna(record[PARENT_ID]):
return f'{record[LABEL]}{elem_sep}{record[ID]}'
else:
if record[PARENT_ID] in cache_dict:
parent_path = cache_dict[record[PARENT_ID]]
else:
try:
parent_path = get_path(df.query(f'{ID} == {record[PARENT_ID]}').iloc[0])
except IndexError as e:
print(f'Index Miss for {record[PARENT_ID]} on record {record.to_dict()}')
parent_path = f'{record[LABEL]}{elem_sep}{record[ID]}'
cache_dict[record[PARENT_ID]] = parent_path
return f"{parent_path}{node_sep}{record[LABEL]}{elem_sep}{record[ID]}"
return df.apply(get_path, axis=1)
df['path'] = get_hierarchy_path(df, dict(), node_sep='>')
我已经尝试过的:
graphframes
包,我可以使用 BFS。这将为我提供一个针对单个离开节点的良好解决方案,但它无法扩展到整个网络。谢谢您的帮助。
我当前针对这一挑战的解决方案不再依赖于 Spark,而是依赖于 SQL。 我将整个数据集加载到 Postgres DB 并在 id、type 和parent_id 上放置唯一索引。
然后使用以下查询,我可以计算路径:
with recursive recursive_hierarchy AS (
-- starting point
select
parent_id
, id
, type
, type || ':' || id as path
, 1 as lvl
from hierarchy.nodes
union all
-- recursion
select
ne.parent_id as parent_id
, h.id
, h.type
, ne.type || ':' || ne.id || '|' || h.path as path
, h.lvl + 1 as lvl
from (
select *
from hierarchy.nodes
) ne
inner join recursive_hierarchy h
on ne.id = h.parent_id
), paths as (
-- complete results
select
*
from recursive_hierarchy
), max_lvl as (
-- retrieve the longest path of a network element
select
id
, max(lvl) as max_lvl
from paths
group by id
)
-- all results with only the longest path of a network element
select distinct
, p.id
, p.type
, p.path
from paths p
inner join max_lvl l
on p.id = l.id
and p.lvl = l.max_lvl
我添加了一个namedtuple而不是tuple;删除了 Pandas 的使用;向
elements
添加了第零个根元素;并将名称计算到字典中id2name
。
通过您的小型测试数据集,我得到您给定的输出。 对于 300 万行,我生成名称所需的时间比生成数据行所需的时间略少。
#!/bin/env python3
"""
Efficient Way to Build Large Scale Hierarchical Data Tree Path
"""
# %%
from collections import namedtuple
import random
random.seed(1337)
Row = namedtuple('Row', 'p_id, typ, n_id') # Row of tree information
id2name = {0: ''} # map n_id to their name, starting with root
node_counts = {'A': 1424383, 'B': 596994, 'C': 234745, 'D': 230937, 'E': 210663, 'F': 122859, 'G': 119453, 'H': 57462, 'I': 23260, 'J': 15008, 'K': 10666, 'L': 6943, 'M': 6724, 'N': 2371, 'O': 2005, 'P': 385}
#node_counts = {'A': 3, 'B': 2, 'C': 3, 'D': 1}
elements = [Row(None, "", 0)] # off by one for root, correction
candidates = list()
root_type = list(node_counts.keys())[-1]
leaf_type = list(node_counts.keys())[0]
root_counts = node_counts[root_type]
leaves_count = node_counts[leaf_type]
ids = [i + 1 for i in range(sum(node_counts.values()))]
idcounter = 0
for i, (name, count) in enumerate(sorted(node_counts.items(), reverse=True)):
for _ in range(count):
_id = ids[idcounter]
idcounter += 1
_type = name
if i == 0:
_parent = 0 # No longer None. Root parent is n_id 0
else:
# select a random one that is not a root or a leaf
if len(candidates) == 0: # first bootstrap case
candidate = random.choice(elements)
else:
candidate = random.choice(candidates)
_parent = candidate.n_id
row = Row(_parent, _type, _id) # New
elements.append(row)
if _type != root_type and _type != leaf_type:
candidates.append(row)
# %%
def assign_name(row:Row) -> None:
"Recusively assign row a name through parents"
p_id, typ, n_id = row
if n_id not in id2name:
parent = elements[p_id]
if p_id in id2name:
id2name[n_id] = f"{id2name[p_id]}>{typ}:{n_id}"
else:
assign_name(parent)
assign_name(row)
for row in elements[1:]:
assign_name(row)
del id2name[0] # root not showwn
for n_id, name in id2name.items():
if name.startswith('>'):
id2name[n_id] = name[1:] # remove leading >