如何让所有工作进程共享同一个对象,并且它们都不会发生变化?例如,编写计算参数向量与所有进程都相同的第二个向量的点积的函数的最简洁方法是什么。天真地,我会写这样的东西:
import multiprocessing
import numpy as np
def main():
static_vector = np.array([1,2,3,4,5])
def f(v):
return np.dot(v, static_vector)
with multiprocessing.Pool() as p:
results = p.map(f, [np.random.random((5,1)) for _ in range(10)])
print(results)
if __name__ == "__main__":
main()
但是失败并出现错误
AttributeError: Can't pickle local object 'main.<locals>.f'
。为了论证,计算静态向量需要一些时间,并且不应该在每个子进程中完成。
看来您正在寻找
multiprocessing.shared_memory
。
试试这个!
import multiprocessing
from multiprocessing import shared_memory
import numpy as np
def init_pool(shm_name,shape, dtype):
global static_vector
existing_shm = shared_memory.SharedMemory(name=shm_name)
static_vector = np.ndarray(shape,dtype=dtype, buffer=existing_shm.buf)
def f(v):
return np.dot(v,static_vector)
def main():
static_vector = np.array([1,2,3,4,5], dtype=np.float64)
static_vector_shape= static_vector.shape
static_vector_dtype = static_vector.dtype
shm = shared_memory.SharedMemory(create=True, size=static_vector.nbytes)
shm_array = np.ndarray(static_vector_shape, dtype=static_vector_dtype, buffer=shm.buf)
np.copyto(shm_array, static_vector)
with multiprocessing.Pool(initializer=init_pool, initargs=(shm.name, static_vector_shape,static_vector_dtype)) as pool:
vectors = [np.random.random(static_vector_shape) for _ in range(10)]
results =pool.map(f, vectors)
print(results)
shm.close()
shm.unlink()
if __name__ == "__main__":
main()
这里我创建了共享内存块并将 static_vector 复制到其中。
我还使用 Pool 的
initializer
和 initargs
参数将共享内存名称和数组元数据传递给每个进程。在 init_pool
函数中,每个工作线程附加到共享内存并将 static_vector 重建为 Numpy 数组