我的问题如下。我有一个大数组“B”,我希望对其进行并行操作。我的功能看起来像
def test(a,B):
entry = B[a,a]
#insert complicated math here
result = b[a]
return result
我知道,如果没有参数,我可以简单地使用 process_map,如下所示:
parallel_results_tqdm = process_map(test, agrid,max_workers=4,chunksize=1).
agrid 是我希望循环的“a”列表。如果变量 B 是全局变量,则这将起作用。但是,我希望在不同的数组上运行该操作并将其作为函数提供。
一个行不通的解决方案是将我的输入列表作为元组传递,这样
agrid = [(0,B),(1,B),(2,B),(3,B)....]
但是在我的例子中 B 很大,所以以这种方式克隆它会导致内存错误。有没有办法传递 process_map 参数而不以这种方式克隆它?
我建议将大数组
B
传递给每个进程,以使数组 在进程之间共享。为此,您可以使用 multiprocessing.Array
,例如:
import ctypes
from multiprocessing import Array
from time import sleep
import numpy as np
from tqdm.contrib.concurrent import process_map
N = 1_000
B = None
def test(a):
arr = np.frombuffer(B.get_obj()).reshape((N, N))
# complicated function
sleep(1)
# if you want to write to the shared array use a lock:
# with arr.get_lock():
# arr[a, a] = ...
return arr[a, a]
if __name__ == "__main__":
B = Array(ctypes.c_double, N * N)
arr = np.frombuffer(B.get_obj()).reshape((N, N))
arr[:] = np.random.uniform(size=(N, N))
parallel_results_tqdm = process_map(
test,
[1, 2, 3, 4],
max_workers=2,
chunksize=1,
)
print(parallel_results_tqdm)