我有很多数据,我首先使用
SharedMemory
将其加载到 RAM 中,然后使用 multiprocessing.Pool.map
读取许多子进程。
这是我正在使用的简化版本(不是真正的示例投诉):
def SharedObject: # wrapper of SharedMemory passed to subprocesses
def __init__(self, blablabla):
self.shmem = SharedMemory(create=True, size=numbers) # reference to shmem will live as long as the instance of the class will (shmem not getting GC)
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data... # this array will get destroyed after __init__ finishes
def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
return selected.__getitem__(indices)
# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__
我的电脑有 64GB RAM。如果我用很少的数据运行上述算法,一切都会顺利进行,但如果我加载很多(~40GB),那么我会收到以下错误:
n __enter__
return self._semlock.__enter__()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\mnandcharvim\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 321, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(
(基本上是锁误用)
数据只能读取,因此如果我可以将其加载到内存的只读部分中,这样就不会有锁,那对我来说会更好。 这个问题指出 SharedMemory 是无锁的,但此时,根据我得到的错误,我不确定)。
我还尝试使代码更加符合我链接的官方文档的示例:
shmems = [] # module variable (same module of SharedObject) as suggested in some answers
def SharedObject:
def __init__(self, blablabla):
shmem = SharedMemory(name=self.name, create=True, size=numbers) # this reference destroyed after __init__
shmems.append(shmem) # but this one outlives it
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data...
def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
shmem = SharedMemory(name=self.name) # added line
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=shmem.buf)
return selected.__getitem__(indices)
# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__
第二个版本让我出错,
shmem = SharedMemory(name=self.name) # added line
说我没有足够的系统资源来做mmap
(奇怪的是,数据已经映射到RAM中:资源足以用于所谓的第一次和一次)加载它)。
要根据您的系统资源正确重现调整参数,以免溢出 RAM。该示例在主 for 循环迭代之一中发生死锁。按
CTRL+C
显示错误。
我在 Windows 10 上使用 Python 3.12.2 64 位。
脚本.py:
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import multiprocessing
class SharedObject:
def __init__(self):
self.shape = (2000000, 2049)
self.shmem = SharedMemory(create=True, size=2000000*8196)
self.dtype = np.float32
_temp_arr=np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
_temp_arr[:] = [np.zeros(shape=2049, dtype=self.dtype) for j in range(2000000)]
def f(data: SharedObject) -> int:
print("hello")
return 5
if __name__ == "__main__":
shobj = SharedObject()
for j in range(20):
print(j)
with multiprocessing.Pool() as pool:
list(pool.map(f, [shobj]*256)) # 256 calls to f
问题中的代码有缺陷。
在共享内存中使用 numpy 数组与多处理结合使用可能最好通过示例来解释。
在此示例中,我们在共享内存中创建一个一维 numpy 整数数组。每次调用子进程 (process()) 时,我们都会向数组的每个元素添加 1。
注意在修改共享内存时使用锁来保护共享内存的基本用途。
process() 被调用 NPROCS 次。因此,一旦所有处理完成,数组的每个元素都应等于 NPROCS。我们断言这是真的。
from multiprocessing.shared_memory import SharedMemory
import numpy as np
from multiprocessing import Pool, Lock
SHM = "my_shared_memory_segment"
N = 1_000
DTYPE = int
NPROCS = 10_000
def process():
global lock
try:
shm = SharedMemory(name=SHM)
with lock:
data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
data[:] += 1
finally:
shm.close()
def init_lock(lock_):
global lock
lock = lock_
def main():
try:
lock = Lock()
size = N * np.dtype(DTYPE).itemsize
shm = SharedMemory(name=SHM, create=True, size=size)
data = np.ndarray((N,), dtype=DTYPE, buffer=shm.buf)
data.fill(0)
pool = Pool(initializer=init_lock, initargs=(lock,))
try:
for _ in range(NPROCS):
pool.apply_async(process)
finally:
pool.close()
pool.join()
assert np.all(data == NPROCS)
finally:
shm.close()
shm.unlink()
if __name__ == "__main__":
main()
您应该能够调整它以满足您的特定需求。