我试图使用看起来像这样的dask分布并行化嵌套循环:
@dask.delayed
def delayed_a(e):
a = do_something_with(e)
return something
@dask.delayed
def delayed_b(element):
computations = []
for e in element:
computations.add(delayed_a(e))
b = dask.compute(*computations, scheduler='distributed',
num_workers=4)
return b
list = [some thousands of elements here]
computations = []
for element in list:
computations.append(delayed_b(element))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
如你所见,我正在使用distributed
调度程序。首先,我创建一个computations
列表,其中包含一个懒惰的delayed_b
函数,该函数将list
中的一个元素作为参数。然后,delayed_b
创建了一组新的computations
,它们调用delayed_a
函数,所有内容都以分布式执行。这个伪代码正在运行,但我发现如果delayed_a
不在那里会更快。那么我的问题是 - 做循环分布式并行的正确方法是什么?
在历史的最后,我想要做的是:
list = [some thousands of elements here]
for element in list:
for e in element:
do_something_with(e)
我真的很感激有关使用dask.distributed
完成嵌套循环的最佳方法的任何建议。
简单:
something = dask.delayed(do_something_with_e
list = [some thousands of elements here]
# this could be written as a one-line comprehension
computations = []
for element in list:
part = []
computations.append(part)
for e in element:
part.append(something(e))
results = dask.compute(*computations, scheduler='distributed',
num_workers=4)
你永远不应该在延迟函数中调用延迟函数或compute()
。
(请注意,只要您创建了客户端,默认情况下就会使用分布式调度程序)