在共享内存中使用 numpy 数组进行多处理

问题描述 投票:0回答:6

我想在共享内存中使用 numpy 数组来与多处理模块一起使用。困难在于像 numpy 数组一样使用它,而不仅仅是 ctypes 数组。

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

这会产生如下输出:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

可以通过ctypes方式访问数组,例如

arr[i]
有道理。但是,它不是 numpy 数组,我无法执行
-1*arr
arr.sum()
等操作。我想解决方案是将 ctypes 数组转换为 numpy 数组。然而(除了无法完成这项工作),我不相信它会再被共享。

对于常见问题似乎会有一个标准解决方案。

python numpy multiprocessing shared
6个回答
100
投票

添加@unutbu(不再可用)和@Henry Gomersall 的答案。您可以在需要时使用

shared_arr.get_lock()
来同步访问:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

示例

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

如果您不需要同步访问或创建自己的锁,则不需要

mp.Array()
。在这种情况下,您可以使用
mp.sharedctypes.RawArray


21
投票

Array
对象有一个与之关联的
get_obj()
方法,该方法返回提供缓冲区接口的ctypes数组。我认为以下应该有效...

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

运行时,打印出

a
的第一个元素现在为 10.0,显示
a
b
只是同一内存中的两个视图。

为了确保它仍然是多处理器安全的,我相信您必须使用

acquire
对象、
release
上存在的
Array
a
方法及其内置锁来确保其全部安全访问(尽管我不是多处理器模块方面的专家)。


21
投票

虽然已经给出的答案很好,但只要满足两个条件,这个问题就有一个更简单的解决方案:

  1. 您使用的是POSIX兼容操作系统(例如Linux、Mac OSX);和
  2. 您的子进程需要对共享数组进行只读访问。
在这种情况下,您不需要显式地设置共享变量,因为子进程将使用 fork 创建。分叉的子进程会自动共享父进程的内存空间。在 Python 多处理的上下文中,这意味着它共享所有

模块级变量;请注意,对于您显式传递给子进程或您在 multiprocessing.Pool 等上调用的函数的参数,这不成立

一个简单的例子:

import multiprocessing import numpy as np # will hold the (implicitly mem-shared) data data_array = None # child worker function def job_handler(num): # built-in id() returns unique memory ID of a variable return id(data_array), np.sum(data_array) def launch_jobs(data, num_jobs=5, num_worker=4): global data_array data_array = data pool = multiprocessing.Pool(num_worker) return pool.map(job_handler, range(num_jobs)) # create some random data and execute the child jobs mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10))) # this will print 'True' on POSIX OS, since the data was shared print(np.all(np.asarray(mem_ids) == id(data_array)))
    

12
投票
我编写了一个小型 python 模块,它使用 POSIX 共享内存在 python 解释器之间共享 numpy 数组。也许你会发现它很方便。

https://pypi.python.org/pypi/SharedArray

其工作原理如下:

import numpy as np import SharedArray as sa # Create an array in shared memory a = sa.create("test1", 10) # Attach it as a different array. This can be done from another # python interpreter as long as it runs on the same computer. b = sa.attach("test1") # See how they are actually sharing the same memory block a[0] = 42 print(b[0]) # Destroying a does not affect b. del a print(b[0]) # See how "test1" is still present in shared memory even though we # destroyed the array a. sa.list() # Now destroy the array "test1" from memory. sa.delete("test1") # The array b is not affected, but once you destroy it then the # data are lost. print(b[0])
    

9
投票
您可以使用

sharedmem

模块:
https://bitbucket.org/cleemesser/numpy-sharedmem

这是您的原始代码,这次使用行为类似于 NumPy 数组的共享内存(请注意调用 NumPy

sum()

 函数的附加最后语句):

from multiprocessing import Process import sharedmem import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = sharedmem.empty(N) arr[:] = unshared_arr.copy() print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child process p = Process(target=f, args=(arr,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%arr[:2] # Perform some NumPy operation print arr.sum()
    

7
投票
使用Python3.8+,您可以使用

multiprocessing.shared_memory

标准库模块:

# np_sharing.py from multiprocessing import Process from multiprocessing.managers import SharedMemoryManager from multiprocessing.shared_memory import SharedMemory from typing import Tuple import numpy as np def create_np_array_from_shared_mem( shared_mem: SharedMemory, shared_data_dtype: np.dtype, shared_data_shape: Tuple[int, ...] ) -> np.ndarray: arr = np.frombuffer(shared_mem.buf, dtype=shared_data_dtype) arr = arr.reshape(shared_data_shape) return arr def child_process( shared_mem: SharedMemory, shared_data_dtype: np.dtype, shared_data_shape: Tuple[int, ...] ): """Logic to be executed by the child process""" arr = create_np_array_from_shared_mem(shared_mem, shared_data_dtype, shared_data_shape) arr[0, 0] = -arr[0, 0] # modify the array backed by shared memory def main(): """Logic to be executed by the parent process""" # Data to be shared: data_to_share = np.random.rand(10, 10) SHARED_DATA_DTYPE = data_to_share.dtype SHARED_DATA_SHAPE = data_to_share.shape SHARED_DATA_NBYTES = data_to_share.nbytes with SharedMemoryManager() as smm: shared_mem = smm.SharedMemory(size=SHARED_DATA_NBYTES) arr = create_np_array_from_shared_mem(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE) arr[:] = data_to_share # load the data into shared memory print(f"The [0,0] element of arr is {arr[0,0]}") # before # Run child process: p = Process(target=child_process, args=(shared_mem, SHARED_DATA_DTYPE, SHARED_DATA_SHAPE)) p.start() p.join() print(f"The [0,0] element of arr is {arr[0,0]}") # after del arr # delete np array so the shared memory can be deallocated if __name__ == "__main__": main()
运行脚本:

$ python3.10 np_sharing.py The [0,0] element of arr is 0.262091705529628 The [0,0] element of arr is -0.262091705529628
由于不同进程中的数组共享相同的底层内存缓冲区,因此标准注意事项适用竞赛条件。

© www.soinside.com 2019 - 2024. All rights reserved.