如何使用dask.distributed并行化嵌套循环?

问题描述 投票:0回答:1

我试图使用看起来像这样的dask分布并行化嵌套循环:

@dask.delayed
def delayed_a(e):
    a = do_something_with(e)
    return something

@dask.delayed
def delayed_b(element):
    computations = []
    for e in element:
        computations.add(delayed_a(e))

    b = dask.compute(*computations, scheduler='distributed',
                    num_workers=4)
    return b

list = [some thousands of elements here]
computations = []
for element in list:
    computations.append(delayed_b(element))
    results = dask.compute(*computations, scheduler='distributed',
                           num_workers=4)

如你所见,我正在使用distributed调度程序。首先,我创建一个computations列表,其中包含一个懒惰的delayed_b函数,该函数将list中的一个元素作为参数。然后,delayed_b创建了一组新的computations,它们调用delayed_a函数,所有内容都以分布式执行。这个伪代码正在运行,但我发现如果delayed_a不在那里会更快。那么我的问题是 - 做循环分布式并行的正确方法是什么?

在历史的最后,我想要做的是:

list = [some thousands of elements here]
for element in list:
    for e in element:
        do_something_with(e)

我真的很感激有关使用dask.distributed完成嵌套循环的最佳方法的任何建议。

python-3.x parallel-processing dask dask-distributed dask-delayed
1个回答
1
投票

简单:

something = dask.delayed(do_something_with_e
list = [some thousands of elements here]

# this could be written as a one-line comprehension
computations = []
for element in list:
    part = []
    computations.append(part)
    for e in element:
        part.append(something(e))

results = dask.compute(*computations, scheduler='distributed',
                       num_workers=4)

你永远不应该在延迟函数中调用延迟函数或compute()

(请注意,只要您创建了客户端,默认情况下就会使用分布式调度程序)

© www.soinside.com 2019 - 2024. All rights reserved.