如何在Python多处理映射中拥有不可变的共享对象

问题描述 投票:0回答:1

如何让所有工作进程共享同一个对象,并且它们都不会发生变化?例如,编写计算参数向量与所有进程都相同的第二个向量的点积的函数的最简洁方法是什么。天真地,我会写这样的东西:

import multiprocessing
import numpy as np

def main():
    static_vector = np.array([1,2,3,4,5])

    def f(v):
        return np.dot(v, static_vector)

    with multiprocessing.Pool() as p:
        results = p.map(f, [np.random.random((5,1)) for _ in range(10)])

    print(results)
    
if __name__ == "__main__":
    main()

但是失败并出现错误

AttributeError: Can't pickle local object 'main.<locals>.f'
。为了论证,计算静态向量需要一些时间,并且不应该在每个子进程中完成。

python parallel-processing multiprocessing
1个回答
0
投票

看来您正在寻找

multiprocessing.shared_memory

试试这个!

import multiprocessing
from multiprocessing import shared_memory
import numpy as np

def init_pool(shm_name,shape, dtype):
    global static_vector
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    static_vector = np.ndarray(shape,dtype=dtype, buffer=existing_shm.buf)

def f(v):
    return np.dot(v,static_vector)

def main():
    static_vector = np.array([1,2,3,4,5], dtype=np.float64)
    static_vector_shape= static_vector.shape
    static_vector_dtype = static_vector.dtype

    shm = shared_memory.SharedMemory(create=True, size=static_vector.nbytes)
    shm_array = np.ndarray(static_vector_shape, dtype=static_vector_dtype, buffer=shm.buf)
    np.copyto(shm_array, static_vector)

    with multiprocessing.Pool(initializer=init_pool, initargs=(shm.name, static_vector_shape,static_vector_dtype)) as pool:
        vectors = [np.random.random(static_vector_shape) for _ in range(10)]
        results =pool.map(f, vectors)

    print(results)
    shm.close()
    shm.unlink()

if __name__ == "__main__":
    main()

这里我创建了共享内存块并将 static_vector 复制到其中。

我还使用 Pool 的

initializer
initargs
参数将共享内存名称和数组元数据传递给每个进程。在
init_pool
函数中,每个工作线程附加到共享内存并将 static_vector 重建为 Numpy 数组

© www.soinside.com 2019 - 2024. All rights reserved.