Context:具有大量任务和依赖项的 ETL,以面向源代码进行优化。
我有一个使用 python pkg NetworkX 定义的有向图(DiGraph),表示各个节点(表)之间的依赖关系。
我计划使用该 dag 来生成优化工作流程,具体取决于数据源的可用性。
假设我有两个数据集合作为输入:
目前,我正在等待早上 6 点触发一切,但业务需要提早数据。
这就是为什么我正在寻找一种方式来移动该图,以生成具有最大并行性的优化工作流程面向源(工作流程之间和工作流程内部)。
通过简单但可并行的工作流程生成图形的快速代码:
import networkx as nx
G = nx.DiGraph()
G.add_edge("collect_A", "job_bronze_A")
G.add_edge("collect_B", "job_bronze_B")
G.add_edge("job_bronze_A", "job_silver_A")
G.add_edge("job_bronze_B", "job_silver_B")
G.add_edge("job_silver_A", "job_gold_A")
G.add_edge("job_silver_B", "job_gold_A")
G.add_edge("job_silver_A", "job_gold_B")
G.add_edge("job_silver_B", "job_gold_B")
topological_sort = list(nx.topological_sort(G))
print("topological_sort : ",topological_sort)
generations = [(generation) for generation in nx.topological_generations(G)]
print("generations : ",generations)
有了这个简单的 DAG,我的目标是分成:
Workflow_A : {
"tasks" : [{
"task":"collect_A",
"dependOn":[]
},
{
"task":"job_bronze_A",
"dependOn":["collect_A"]
},
{
"task":"job_silver_A",
"dependOn":["job_bronze_A"]
}],
"dependOn" : []
}
Workflow_B : {
"tasks" : [{
"task":"collect_B",
"dependOn":[]
},
{
"task":"job_bronze_B",
"dependOn":["collect_B"]
},
{
"task":"job_silver_C",
"dependOn":["job_bronze_C"]
}],
"dependOn" : []
}
Workflow_C : {
"tasks" : [{
"task":"job_gold_A",
"dependOn":[]
},
{
"task":"job_gold_B",
"dependOn":[]
}],
"dependOn" : ["Workflow_A","Workflow_B"]
}
有了这个结果,我就能够同时定义我的全局工作流程:
A (parallel)
B (parallel)
C (dependOn A & B)
我尝试使用 nx.topological_sort(),但它返回整个图的顺序。
topological_sort : ['collect_A', 'collect_B', 'job_bronze_A', 'job_bronze_B', 'job_silver_A', 'job_silver_B', 'job_gold_A', 'job_gold_B']
我还尝试了topological_ Generations,它对这样的作业进行分组:
generations : [['collect_A', 'collect_B'], ['job_bronze_A', 'job_bronze_B'], ['job_silver_A', 'job_silver_B'], ['job_gold_A', 'job_gold_B']]
我对此并不满意。作业按工作流程分组并按顺序执行。与此没有并行性。 当我希望存在图遍历算法时,我不想深入研究需要时间的自定义代码。
你有什么想法吗?
谢谢你。
您必须定义入口点(“collect_A”、“collect_B”),然后检查每个作业的祖先列表中有多少个入口点。
import networkx as nx
G = nx.DiGraph()
G.add_edge("collect_A", "job_bronze_A")
G.add_edge("collect_B", "job_bronze_B")
G.add_edge("job_bronze_A", "job_silver_A")
G.add_edge("job_bronze_B", "job_silver_B")
G.add_edge("job_silver_A", "job_gold_A")
G.add_edge("job_silver_B", "job_gold_A")
G.add_edge("job_silver_A", "job_gold_B")
G.add_edge("job_silver_B", "job_gold_B")
A = []
B = []
C = []
for node in nx.topological_sort(G):
ancestors = nx.ancestors(G, node)
if ("collect_A" in ancestors) & ("collect_B" in ancestors):
C.append(node)
elif ("collect_A" in ancestors) | (node == "collect_A"):
A.append(node)
elif ("collect_B" in ancestors) | (node == "collect_B"):
B.append(node)
print(f"A : {A}")
print(f"B : {B}")
print(f"C : {C}")
# A : ['collect_A', 'job_bronze_A', 'job_silver_A']
# B : ['collect_B', 'job_bronze_B', 'job_silver_B']
# C : ['job_gold_A', 'job_gold_B']
如果这是一次性解决方案,手动定义计算分支(就像我上面所做的那样)就可以了。如果这是一个反复出现的问题和/或计算图非常复杂,您可能需要研究适当的管道框架,例如snakemake和ruffus。就我个人而言,我喜欢 ruffus 的简单性,但 Snakemake 无疑是这两个库中更流行、更精致的一个。