我正在尝试在 Sage/Python 中实现一种算法,用于计算从图 G 到图 H 的图同态,并在良好的树分解上进行动态编程。
我已经完成了这个算法,现在我希望与Dask并行化这个程序(我已经尝试过
concurrent.futures
,但存在一些pickle问题)。应该注意的是,对于漂亮的树分解的动态编程,我们会有某种依赖关系,即 我们只能在获得其子节点的结果之后才能开始计算父节点的结果( ren) 节点.
与 Dask 并行:
def process_node(self, node):
node_type = self.dir_labelled_TD.get_vertex(node)
match node_type:
case 'intro':
result = self._add_intro_node_parallel(node)
case 'forget':
result = self._add_forget_node_parallel(node)
case 'join':
result = self._add_join_node_parallel(node)
case _:
result = self._add_leaf_node_parallel(node)
node_index = get_node_index(node)
self.DP_table[node_index] = result
return result
def count_homomorphisms_parallel(self):
# Dictionary to store all futures/promises
self.futures = {}
for node in reversed(self.dir_labelled_TD.vertices()):
# Delaying each node process and storing in futures
self.futures[node] = self.process_node(node)
print("Futures: ", self.futures)
# Compute all results, respecting the inherent dependencies among them
results = dask.compute(*self.futures.values())
print("Results: ", [f.compute() for f in results])
return self.DP_table[0][0]
当我尝试在笔记本中运行该程序时,使用以下示例
par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
par_count = par_counter.count_homomorphisms_parallel()
print(par_count)
输出如下。我对并发(Python)比较陌生,并且浏览了相关文档,但仍然无法弄清楚。
所以我想知道您是否有一些建议或想法,谢谢您的宝贵时间!好奇的话请看文末非水货版。
Futures: {(6, {}): Delayed('process_node-0b571dcd-00e5-4871-871a-ef52e16b4ffb'), (5, {2}): Delayed('process_node-0fbc0886-3368-4d0e-8b09-751cce606ffe'), (4, {0, 2}): Delayed('process_node-0187a2da-aba7-42f7-83ab-497f62ea6b1f'), (3, {0}): Delayed('process_node-18729eea-99f4-45ee-af15-0de45395f181'), (2, {0, 1}): Delayed('process_node-8c85c333-301d-4e49-b9d4-c01bc20c05ae'), (1, {0}): Delayed('process_node-7e528db6-4636-4b31-8eb1-6f807ac32627'), (0, {}): Delayed('process_node-6bb90670-11b4-4e40-b7bc-cefb2fef6479')}
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In [3], line 28
23 # colour_counter = GraphHomomorphismCounter(square, three_grid, 2, square_clr, three_grid_clr, colourful=True)
24 # colourful_count = colour_counter.count_homomorphisms_best()
25 # print(colourful_count)
27 par_counter = ParallelGraphHomomorphismCounter(graph, three_grid)
---> 28 par_count = par_counter.count_homomorphisms_parallel()
29 print(par_count)
File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in ParallelGraphHomomorphismCounter.count_homomorphisms_parallel(self)
116 # Compute all results, respecting the inherent dependencies among them
117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
119 # Since the results are integrated into the DP_table in each process_node call,
120 # you can simply access the final result:
121 return self.DP_table[0][0]
File ~/github/local-hom-count/local_hom_count_best_parallel.py:118, in <listcomp>(.0)
116 # Compute all results, respecting the inherent dependencies among them
117 results = dask.compute(*self.futures.values())
--> 118 print("Results: ", [f.compute() for f in results])
119 # Since the results are integrated into the DP_table in each process_node call,
120 # you can simply access the final result:
121 return self.DP_table[0][0]
File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:375, in DaskMethodsMixin.compute(self, **kwargs)
351 def compute(self, **kwargs):
352 """Compute this dask collection
353
354 This turns a lazy Dask collection into its in-memory equivalent.
(...)
373 dask.compute
374 """
--> 375 (result,) = compute(self, traverse=False, **kwargs)
376 return result
File ~/.sage/local/lib/python3.11/site-packages/dask/base.py:661, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
658 postcomputes.append(x.__dask_postcompute__())
660 with shorten_traceback():
--> 661 results = schedule(dsk, keys, **kwargs)
663 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/github/local-hom-count/local_hom_count_best_parallel.py:239, in ParallelGraphHomomorphismCounter._add_intro_node_parallel(self, node)
235 child_DP_entry = self.DP_table[child_node_index]
236 # print("INTRO child DP entry: ", child_DP_entry)
237 # print("\n")
--> 239 for mapped in range(len(child_DP_entry)):
240 # Neighborhood of the mapped vertices of intro vertex in the target graph
241 mapped_intro_nbhs = [extract_bag_vertex(mapped, vtx, self.actual_target_size) for vtx in intro_vtx_nbhs]
242 # print("mapped: ", mapped)
243 # print("mapped nbhs in target: ", mapped_intro_nbhs)
File ~/.sage/local/lib/python3.11/site-packages/dask/delayed.py:635, in Delayed.__len__(self)
633 def __len__(self):
634 if self._length is None:
--> 635 raise TypeError("Delayed objects of unspecified length have no len()")
636 return self._length
TypeError: Delayed objects of unspecified length have no len()
非并行:
def count_homomorphisms_best(self):
r"""
Return the number of homomorphisms from the graph `G` to the graph `H`.
A homomorphism from a graph `G` to a graph `H` is a function
`\varphi : V(G) \mapsto V(H)`, such that for any edge `uv \in E(G)` the
pair `\varphi(u) \varphi(v)` is an edge of `H`.
For more information, see the :wikipedia:`Graph_homomorphism`.
ALGORITHM:
This is an implementation based on the proof of Prop. 1.6 in [CDM2017]_.
OUTPUT:
- an integer, the number of homomorphisms from `graph` to `target_graph`
EXAMPLES::
sage: graph = graphs.CompleteBipartiteGraph(1, 4)
sage: target_graph = graphs.CompleteGraph(4)
sage: from sage.graphs.hom_count_best import count_homomorphisms_best
sage: count_homomorphisms_best(graph, target_graph)
324
"""
# Whether it's BFS or DFS, every node below join node(s) would be
# computed first, so we can safely go bottom-up.
for node in reversed(self.dir_labelled_TD.vertices()):
node_type = self.dir_labelled_TD.get_vertex(node)
# print("\nNode: ", node, node_type)
match node_type:
case 'intro':
self._add_intro_node_best(node)
case 'forget':
self._add_forget_node_best(node)
case 'join':
self._add_join_node_best(node)
case _:
self._add_leaf_node_best(node)
return self.DP_table[0][0]
错误来自不在您的MCVE中的块!
for mapped in range(len(child_DP_entry)):
mapped_intro_nbhs = [extract_bag_vertex(mapped, vtx, self.actual_target_size) for vtx in intro_vtx_nbhs]
但是,您可以将其替换为
enumerate()
或其他不需要总长度的循环结构
for index, entry in enumerate(child_DP_entry):
# work with `index`, `entry` can be ignored