[使用joblib和SLURM在Python中并行化for循环

问题描述 投票:0回答:1

我有一个包含100个元组tuplelist的列表,它们用作外部函数的输入。外部函数返回一个值,并将该值附加到数组中,例如(MainFile.py):

from ExternalPythonFile import ExternalFunction

valuelist = []
for a,b in tuplelist:
    value = ExternalFunction(a,b)
    # more functions here
    valuelist.append(value)
print(len(valuelist))

使用上述for循环时,print(len(valuelist))的输出为(100,)

现在,由于在我的情况下元组的顺序及其附加方式无关紧要,我想并行处理for循环,因为处理100个元组大约需要10分钟,并且我希望扩展该数量。我已经在下面的[MainFileJoblib.py)中尝试了一个joblib实现:

from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing

valuelist = []

def TupleFunction(a,b):
        value = ExternalFunction(a,b)
        # more functions here
        valuelist.append(value)

with parallel_backend('multiprocessing'):
    Parallel(n_jobs=10)(delayed(TupleFunction)(a,b) for a,b in tuplelist)

print(len(valuelist))

我正在UNIX计算集群上运行所有这些,但是运行时间在大约8分钟时仍然相似。输出也是错误的,它打印为(0,)

[看htop,我发现实际上有10个内核正在使用,但每个内核只使用了20%。

我也试图通过SLURM运行joblib实现:

srun --ntasks=1 --ncpus-per-task=10 python3 MainFileJoblib.py

这绝对快于大约2分钟,但再次给出了错误的结果(0,)

并行处理原始for循环的最佳方法是什么?

python for-loop parallel-processing slurm joblib
1个回答
0
投票

Joblib自己管理输出列表的创建和填充,因此可以轻松地通过以下方式固定代码:

from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing


with parallel_backend('multiprocessing'):
    valuelist = Parallel(n_jobs=10)(delayed(ExternalFunction)(a, b) for a, b in tuplelist)

print(len(valuelist))

如果出于某种原因需要更新类似数组的对象,则可以按照以下最小示例使用numpy memmap:

import tempfile
import numpy as np
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing


# define function to update your array
def fill_array(mm_file, i, tuple_val):
    a, b = tuple_val
    value = ExternalFunction(a, b)
    # more functions here
    mm_file[i] = value

# create a temporary folder
tmp_dir = tempfile.mkdtemp()
# create a file where to dump your array
values_fname_memmap = Path(tmp_dir).joinpath("values_memmap")
values_memmap = np.memmap(values_fname_memmap.as_posix(),
                          dtype=np.float,
                          shape=(len(tuplelist), ),
                          mode='w+')

with parallel_backend('multiprocessing'):
    Parallel(n_jobs=10)(delayed(fill_array)(values_memmap, i, ab) 
                        for i, ab in enumerate(tuplelist))

print(len(values_memmap))

如果需要对值应用一组转换(#个更多函数),只需对ExternalFunction进行包装,为给定的元组(a,b)输出所需的值。

我希望尽管回复很晚,但对您仍然有用。

© www.soinside.com 2019 - 2024. All rights reserved.