Graph / DAG:并行执行工作流程优化

问题描述 投票:0回答:1

Context:具有大量任务和依赖项的 ETL,以面向源代码进行优化。

我有一个使用 python pkg NetworkX 定义的有向图(DiGraph),表示各个节点(表)之间的依赖关系。

我计划使用该 dag 来生成优化工作流程,具体取决于数据源的可用性。

假设我有两个数据集合作为输入:

  • 一个在凌晨 5 点准备就绪(collect_A
  • 第二场是早上 6 点。 (收集_B)

目前,我正在等待早上 6 点触发一切,但业务需要提早数据。

这就是为什么我正在寻找一种方式来移动该图,以生成具有最大并行性的优化工作流程面向源(工作流程之间和工作流程内部)。

通过简单但可并行的工作流程生成图形的快速代码:

import networkx as nx

G = nx.DiGraph()

G.add_edge("collect_A", "job_bronze_A")
G.add_edge("collect_B", "job_bronze_B")

G.add_edge("job_bronze_A", "job_silver_A")
G.add_edge("job_bronze_B", "job_silver_B")

G.add_edge("job_silver_A", "job_gold_A")
G.add_edge("job_silver_B", "job_gold_A")
G.add_edge("job_silver_A", "job_gold_B")
G.add_edge("job_silver_B", "job_gold_B")


topological_sort = list(nx.topological_sort(G))
print("topological_sort : ",topological_sort)

generations = [(generation) for generation in nx.topological_generations(G)]
print("generations : ",generations)

有了这个简单的 DAG,我的目标是分成:

Workflow_A : {
    "tasks" : [{
        "task":"collect_A",
        "dependOn":[]
    },
    {
        "task":"job_bronze_A",
        "dependOn":["collect_A"]
    },
    {
        "task":"job_silver_A",
        "dependOn":["job_bronze_A"]

    }],
    "dependOn" : []
}


Workflow_B : {
    "tasks" : [{
        "task":"collect_B",
        "dependOn":[]
    },
    {
        "task":"job_bronze_B",
        "dependOn":["collect_B"]
    },
    {
        "task":"job_silver_C",
        "dependOn":["job_bronze_C"]

    }],
    "dependOn" : []
}


Workflow_C : {
    "tasks" : [{
        "task":"job_gold_A",
        "dependOn":[]
    },
    {
        "task":"job_gold_B",
        "dependOn":[]
    }],
    "dependOn" : ["Workflow_A","Workflow_B"]
}

有了这个结果,我就能够同时定义我的全局工作流程:

A (parallel)

B (parallel)

C (dependOn A & B)

我尝试使用 nx.topological_sort(),但它返回整个图的顺序。

topological_sort :  ['collect_A', 'collect_B', 'job_bronze_A', 'job_bronze_B', 'job_silver_A', 'job_silver_B', 'job_gold_A', 'job_gold_B']

我还尝试了topological_ Generations,它对这样的作业进行分组:

generations :  [['collect_A', 'collect_B'], ['job_bronze_A', 'job_bronze_B'], ['job_silver_A', 'job_silver_B'], ['job_gold_A', 'job_gold_B']]

我对此并不满意。作业按工作流程分组并按顺序执行。与此没有并行性。 当我希望存在图遍历算法时,我不想深入研究需要时间的自定义代码。

你有什么想法吗?

谢谢你。

charts tree networkx workflow directed-acyclic-graphs
1个回答
0
投票

您必须定义入口点(“collect_A”、“collect_B”),然后检查每个作业的祖先列表中有多少个入口点。

import networkx as nx

G = nx.DiGraph()

G.add_edge("collect_A", "job_bronze_A")
G.add_edge("collect_B", "job_bronze_B")

G.add_edge("job_bronze_A", "job_silver_A")
G.add_edge("job_bronze_B", "job_silver_B")

G.add_edge("job_silver_A", "job_gold_A")
G.add_edge("job_silver_B", "job_gold_A")
G.add_edge("job_silver_A", "job_gold_B")
G.add_edge("job_silver_B", "job_gold_B")

A = []
B = []
C = []
for node in nx.topological_sort(G):
    ancestors = nx.ancestors(G, node)
    if ("collect_A" in ancestors) & ("collect_B" in ancestors):
        C.append(node)
    elif ("collect_A" in ancestors) | (node == "collect_A"):
        A.append(node)
    elif ("collect_B" in ancestors) | (node == "collect_B"):
        B.append(node)

print(f"A : {A}")
print(f"B : {B}")
print(f"C : {C}")
# A : ['collect_A', 'job_bronze_A', 'job_silver_A']
# B : ['collect_B', 'job_bronze_B', 'job_silver_B']
# C : ['job_gold_A', 'job_gold_B']

如果这是一次性解决方案,手动定义计算分支(就像我上面所做的那样)就可以了。如果这是一个反复出现的问题和/或计算图非常复杂,您可能需要研究适当的管道框架,例如snakemake和ruffus。就我个人而言,我喜欢 ruffus 的简单性,但 Snakemake 无疑是这两个库中更流行、更精致的一个。

© www.soinside.com 2019 - 2024. All rights reserved.