在 Python/Sage 中使用 Dask 进行并行编程,以便在良好的树分解上进行动态编程?

问题描述 投票:0回答:1

我正在尝试在 Sage/Python 中实现一种算法,用于计算从图 G 到图 H 的图同态,并在良好的树分解上进行动态编程。

我已经完成了这个算法,现在我希望与Dask并行化这个程序(我已经尝试过

concurrent.futures
,但存在一些pickle问题)。应该注意的是,对于漂亮的树分解的动态编程,我们会有某种依赖关系,即 我们只能在获得其子节点的结果之后才能开始计算父节点的结果( ren) 节点.

与 Dask 并行

def process_node(self, node):
    node_type = self.dir_labelled_TD.get_vertex(node)
    match node_type:
        case 'intro':
            result = self._add_intro_node_parallel(node)
        case 'forget':
            result = self._add_forget_node_parallel(node)
        case 'join':
            result = self._add_join_node_parallel(node)
        case _:
            result = self._add_leaf_node_parallel(node)

    node_index = get_node_index(node)
    self.DP_table[node_index] = result
    return result

def count_homomorphisms_parallel(self):
    # Dictionary to store all futures/promises
    self.futures = {}
    for node in reversed(self.dir_labelled_TD.vertices()):
        # Delaying each node process and storing in futures
        self.futures[node] = self.process_node(node)

    print("Futures: ", self.futures)

    # Compute all results, respecting the inherent dependencies among them
    results = dask.compute(*self.futures.values())
    print("Results: ", [f.compute() for f in results])
    return self.DP_table[0][0]

当我尝试在笔记本中运行该程序时,使用以下示例

par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
par_count = par_counter.count_homomorphisms_parallel()
print(par_count)

输出如下。我对并发(Python)比较陌生,并且浏览了相关文档,但仍然无法弄清楚。

所以我想知道您是否有一些建议或想法,谢谢您的宝贵时间!好奇的话请看文末非水货版。

Futures:  {(6, {}): Delayed('process_node-0b571dcd-00e5-4871-871a-ef52e16b4ffb'), (5, {2}): Delayed('process_node-0fbc0886-3368-4d0e-8b09-751cce606ffe'), (4, {0, 2}): Delayed('process_node-0187a2da-aba7-42f7-83ab-497f62ea6b1f'), (3, {0}): Delayed('process_node-18729eea-99f4-45ee-af15-0de45395f181'), (2, {0, 1}): Delayed('process_node-8c85c333-301d-4e49-b9d4-c01bc20c05ae'), (1, {0}): Delayed('process_node-7e528db6-4636-4b31-8eb1-6f807ac32627'), (0, {}): Delayed('process_node-6bb90670-11b4-4e40-b7bc-cefb2fef6479')}

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In [3], line 28
     23 # colour_counter = GraphHomomorphismCounter(square, three_grid, 2, square_clr, three_grid_clr, colourful=True)
     24 # colourful_count = colour_counter.count_homomorphisms_best()
     25 # print(colourful_count)
     27 par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
---> 28 par_count = par_counter.count_homomorphisms_parallel()
     29 print(par_count)

File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in ParallelGraphHomomorphismCounter.count_homomorphisms_parallel(self)
    116 # Compute all results, respecting the inherent dependencies among them
    117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
    119 # Since the results are integrated into the DP_table in each process_node call,
    120 # you can simply access the final result:
    121 return self.DP_table[0][0]

File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in <listcomp>(.0)
    116 # Compute all results, respecting the inherent dependencies among them
    117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
    119 # Since the results are integrated into the DP_table in each process_node call,
    120 # you can simply access the final result:
    121 return self.DP_table[0][0]

File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
    351 def compute(self, **kwargs):
    352     """Compute this dask collection
    353 
    354     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    373     dask.compute
    374     """
--> 375     (result,) = compute(self, traverse=False, **kwargs)
    376     return result

File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    658     postcomputes.append(x.__dask_postcompute__())
    660 with shorten_traceback():
--> 661     results = schedule(dsk, keys, **kwargs)
    663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/github/local-hom-count/local_hom_count_best_parallel.py:239, in ParallelGraphHomomorphismCounter._add_intro_node_parallel(self, node)
    235 child_DP_entry = self.DP_table[child_node_index]
    236 # print("INTRO child DP entry: ", child_DP_entry)
    237 # print("\n")
--> 239 for mapped in range(len(child_DP_entry)):
    240     # Neighborhood of the mapped vertices of intro vertex in the target graph
    241     mapped_intro_nbhs = [extract_bag_vertex(mapped, vtx, self.actual_target_size) for vtx in intro_vtx_nbhs]
    242     # print("mapped: ", mapped)
    243     # print("mapped nbhs in target: ", mapped_intro_nbhs)

File ~/.sage/local/lib/python3.11/site-packages/dask/delayed.py:635, in Delayed.__len__(self)
    633 def __len__(self):
    634     if self._length is None:
--> 635         raise TypeError("Delayed objects of unspecified length have no len()")
    636     return self._length

TypeError: Delayed objects of unspecified length have no len()

非并行

def count_homomorphisms_best(self):
    r"""
    Return the number of homomorphisms from the graph `G` to the graph `H`.

    A homomorphism from a graph `G` to a graph `H` is a function
    `\varphi : V(G) \mapsto V(H)`, such that for any edge `uv \in E(G)` the
    pair `\varphi(u) \varphi(v)` is an edge of `H`.

    For more information, see the :wikipedia:`Graph_homomorphism`.

    ALGORITHM:

    This is an implementation based on the proof of Prop. 1.6 in [CDM2017]_.

    OUTPUT:

    - an integer, the number of homomorphisms from `graph` to `target_graph`

    EXAMPLES::

        sage: graph = graphs.CompleteBipartiteGraph(1, 4)
        sage: target_graph = graphs.CompleteGraph(4)
        sage: from sage.graphs.hom_count_best import count_homomorphisms_best
        sage: count_homomorphisms_best(graph, target_graph)
        324
    """
    # Whether it's BFS or DFS, every node below join node(s) would be
    # computed first, so we can safely go bottom-up.
    for node in reversed(self.dir_labelled_TD.vertices()):
        node_type = self.dir_labelled_TD.get_vertex(node)
        # print("\nNode: ", node, node_type)

        match node_type:
            case 'intro':
                self._add_intro_node_best(node)
            case 'forget':
                self._add_forget_node_best(node)
            case 'join':
                self._add_join_node_best(node)

            case _: 
                self._add_leaf_node_best(node)

    return self.DP_table[0][0]
python math concurrency parallel-processing dask
1个回答
0
投票

错误来自不在您的MCVE中的块!

for mapped in range(len(child_DP_entry)):
    mapped_intro_nbhs = [extract_bag_vertex(mapped, vtx, self.actual_target_size) for vtx in intro_vtx_nbhs]

但是,您可以将其替换为

enumerate()
或其他不需要总长度的循环结构

for index, entry in enumerate(child_DP_entry):
    # work with `index`, `entry` can be ignored
© www.soinside.com 2019 - 2024. All rights reserved.