我有一个包含100个元组tuplelist
的列表,它们用作外部函数的输入。外部函数返回一个值,并将该值附加到数组中,例如(MainFile.py
):
from ExternalPythonFile import ExternalFunction
valuelist = []
for a,b in tuplelist:
value = ExternalFunction(a,b)
# more functions here
valuelist.append(value)
print(len(valuelist))
使用上述for循环时,print(len(valuelist))
的输出为(100,)
。
现在,由于在我的情况下元组的顺序及其附加方式无关紧要,我想并行处理for循环,因为处理100个元组大约需要10分钟,并且我希望扩展该数量。我已经在下面的[MainFileJoblib.py
)中尝试了一个joblib实现:
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
valuelist = []
def TupleFunction(a,b):
value = ExternalFunction(a,b)
# more functions here
valuelist.append(value)
with parallel_backend('multiprocessing'):
Parallel(n_jobs=10)(delayed(TupleFunction)(a,b) for a,b in tuplelist)
print(len(valuelist))
我正在UNIX计算集群上运行所有这些,但是运行时间在大约8分钟时仍然相似。输出也是错误的,它打印为(0,)
。
[看htop
,我发现实际上有10个内核正在使用,但每个内核只使用了20%。
我也试图通过SLURM运行joblib实现:
srun --ntasks=1 --ncpus-per-task=10 python3 MainFileJoblib.py
这绝对快于大约2分钟,但再次给出了错误的结果(0,)
。
并行处理原始for循环的最佳方法是什么?
Joblib自己管理输出列表的创建和填充,因此可以轻松地通过以下方式固定代码:
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
with parallel_backend('multiprocessing'):
valuelist = Parallel(n_jobs=10)(delayed(ExternalFunction)(a, b) for a, b in tuplelist)
print(len(valuelist))
如果出于某种原因需要更新类似数组的对象,则可以按照以下最小示例使用numpy memmap:
import tempfile
import numpy as np
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing
# define function to update your array
def fill_array(mm_file, i, tuple_val):
a, b = tuple_val
value = ExternalFunction(a, b)
# more functions here
mm_file[i] = value
# create a temporary folder
tmp_dir = tempfile.mkdtemp()
# create a file where to dump your array
values_fname_memmap = Path(tmp_dir).joinpath("values_memmap")
values_memmap = np.memmap(values_fname_memmap.as_posix(),
dtype=np.float,
shape=(len(tuplelist), ),
mode='w+')
with parallel_backend('multiprocessing'):
Parallel(n_jobs=10)(delayed(fill_array)(values_memmap, i, ab)
for i, ab in enumerate(tuplelist))
print(len(values_memmap))
如果需要对值应用一组转换(#个更多函数),只需对ExternalFunction进行包装,为给定的元组(a,b)输出所需的值。
我希望尽管回复很晚,但对您仍然有用。