SharedMemory numpy 数组多处理问题

问题描述 投票:0回答:1

问题

我有很多数据,我首先使用

SharedMemory
将其加载到 RAM 中,然后使用
multiprocessing.Pool.map
读取许多子进程。

代码

这是我正在使用的简化版本(不是真正的示例投诉):

def SharedObject: # wrapper of SharedMemory passed to subprocesses
  def __init__(self, blablabla):
    self.shmem = SharedMemory(create=True, size=numbers) # reference to shmem will live as long as the instance of the class will (shmem not getting GC)
    temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
    temp_arr[:] = ...lot of data... # this array will get destroyed after __init__ finishes

  def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
    selected = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
    return selected.__getitem__(indices)

# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
  result= list(pool.map(f, shobj))  # f does shobj.__getitem__

奇怪的行为

我的电脑有 64GB RAM。如果我用很少的数据运行上述算法,一切都会顺利进行,但如果我加载很多(~40GB),那么我会收到以下错误:

n __enter__
return self._semlock.__enter__()
       ^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\mnandcharvim\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 321, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(

(基本上是锁误用)

数据只能读取,因此如果我可以将其加载到内存的只读部分中,这样就不会有锁,那对我来说会更好。 这个问题指出 SharedMemory 是无锁的,但此时,根据我得到的错误,我不确定)。

第二版

我还尝试使代码更加符合我链接的官方文档的示例:

shmems = [] # module variable (same module of SharedObject) as suggested in some answers
def SharedObject:
  def __init__(self, blablabla):
    shmem = SharedMemory(name=self.name, create=True, size=numbers) # this reference destroyed after __init__
    shmems.append(shmem) # but this one outlives it
    temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
    temp_arr[:] = ...lot of data...

  def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
    shmem = SharedMemory(name=self.name) # added line
    selected = np.ndarray(self.shape, dtype=self.dtype, buffer=shmem.buf)
    return selected.__getitem__(indices)

# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
  result= list(pool.map(f, shobj))  # f does shobj.__getitem__

第二个奇怪的行为

第二个版本让我出错,

shmem = SharedMemory(name=self.name) # added line
说我没有足够的系统资源来做
mmap
(奇怪的是,数据已经映射到RAM中:资源足以用于所谓的第一次和一次)加载它)。

注意事项

  • 数据处理仅涉及读取数据
  • 我无法使用线程(与读取的内容一样,我进行非 GIL 释放计算)。
  • 我只是想知道一种清晰、直接和有效的方法,我应该将 SharedMemory 与 numpy 数组和多重处理结合使用。
  • 我需要将数据读取为 numpy 数组

最小可重现示例

要根据您的系统资源正确重现调整参数,以免溢出 RAM。该示例在主 for 循环迭代之一中发生死锁。按

CTRL+C
显示错误。 我在 Windows 10 上使用 Python 3.12.2 64 位。

脚本.py:

from multiprocessing.shared_memory import SharedMemory
import numpy as np
import multiprocessing

class SharedObject:
    def __init__(self):
        self.shape = (2000000, 2049)
        self.shmem = SharedMemory(create=True, size=2000000*8196)
        self.dtype = np.float32
        _temp_arr=np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
        _temp_arr[:] = [np.zeros(shape=2049, dtype=self.dtype) for j in range(2000000)]

def f(data: SharedObject) -> int:
    print("hello")
    return 5

if __name__ == "__main__":
    shobj = SharedObject()

    for j in range(20):
        print(j)
        with multiprocessing.Pool() as pool:
            list(pool.map(f, [shobj]*256)) # 256 calls to f
python numpy multiprocessing shared-memory
1个回答
0
投票

问题中的代码有缺陷。

在共享内存中使用 numpy 数组与多处理结合使用可能最好通过示例来解释。

在此示例中,我们在共享内存中创建一个一维 numpy 整数数组。每次调用子进程 (process()) 时,我们都会向数组的每个元素添加 1。

注意在修改共享内存时使用锁来保护共享内存的基本用途。

process() 被调用 NPROCS 次。因此,一旦所有处理完成,数组的每个元素都应等于 NPROCS。我们断言这是真的。

from multiprocessing.shared_memory import SharedMemory
import numpy as np
from multiprocessing import Pool, Lock

SHM = "my_shared_memory_segment"
N = 1_000
DTYPE = int
NPROCS = 10_000

def process():
    global lock
    try:
        shm = SharedMemory(name=SHM)
        with lock:
            data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
            data[:] += 1
    finally:
        shm.close()

def init_lock(lock_):
    global lock
    lock = lock_

def main():
    try:
        lock = Lock()
        size = N * np.dtype(DTYPE).itemsize
        shm = SharedMemory(name=SHM, create=True, size=size)
        data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
        data.fill(0)
        pool = Pool(initializer=init_lock, initargs=(lock,))
        try:
            for _ in range(NPROCS):
                pool.apply_async(process)
        finally:
            pool.close()
            pool.join()
        assert np.all(data == NPROCS)
    finally:
        shm.close()
        shm.unlink()

if __name__ == "__main__":
    main()

您应该能够调整它以满足您的特定需求。

© www.soinside.com 2019 - 2024. All rights reserved.