背景:
我需要分析一年中每个小时的一些天气数据。对于每个小时,我需要在执行一些计算之前读入每个小时的一些输入。其中一个输入是一个非常大的 numpy 数组
x
,它不会改变,并且一年中的每个小时都是相同的。然后输出是一个向量(1D numpy 数组)y
,其中包含一年中每个小时的计算结果。
目标:
使用多处理模块加快计算时间。特别是,我尝试使用多处理的
x
子模块将 shared_memory
传递给每个进程。
我在 Windows 10 上运行 CPython 3.10.8,使用 Spyder 5.3.3 作为 IDE。
代码(出于测试目的而简化):
import multiprocessing
import numpy as np
from multiprocessing import shared_memory
def multiprocessing_function(args):
nn, shm_name, y_shm_name, shape, dtype, N = args
existing_shm = shared_memory.SharedMemory(name=shm_name)
x = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
existing_y_shm = shared_memory.SharedMemory(name=y_shm_name)
y = np.ndarray((N,), dtype=dtype, buffer=existing_y_shm.buf)
y[nn] = 1
existing_shm.close()
existing_y_shm.close()
if __name__ == '__main__':
x = np.random.rand(int(1e7), 16)
N = 8760 # Number of hours in a year
dtype = x.dtype
shm = shared_memory.SharedMemory(create=True, size=x.nbytes)
shm_array = np.ndarray(x.shape, dtype=x.dtype, buffer=shm.buf)
np.copyto(shm_array, x)
y_shm = shared_memory.SharedMemory(create=True, size=N * x.itemsize)
y_array = np.ndarray((N,), dtype=x.dtype, buffer=y_shm.buf)
args_case = [(nn, shm.name, y_shm.name, x.shape, dtype, N) for nn in range(N)]
with multiprocessing.Pool() as pool:
pool.map(multiprocessing_function, args_case)
y = np.array(y_array)
shm.close()
y_shm.close()
shm.unlink()
y_shm.unlink()
问题:
当我运行代码时,它返回正确的向量,但 50% 的情况下,我会收到“Windows 致命异常:访问冲突”并且内核崩溃。如果我随后更改数组的大小,可能不会有问题,但如果我重新启动 Spyder 并尝试使用新的数组大小重新运行相同的代码,则会出现相同的错误,并且内核将再次崩溃。这种不一致的行为非常奇怪。我感觉这是内存泄漏问题,但我不知道如何解决。
我不知道它遵循的确切路径,但我可以告诉你错误的原因。
在共享内存文件关闭后,Spyder 本身或 IPython shell 正在尝试访问您的共享 numpy 数组之一。我的第一个猜测是 Spyder 试图通过枚举局部变量来填充它的“变量资源管理器”窗格。这会导致对 numpy 数组的访问,但它指向的内存位置不再有效。
SharedMemory
在文件系统上创建文件(因此它们是可共享的),其方式仅驻留在内存中(因此它们很快)。然后,您将获得对该文件作为缓冲区的内存映射访问。根据操作系统的不同,存在一些差异,但总的来说这是正确的。与任何其他文件一样,您有更多的责任自行清理:close()
和 unlink()
。
不幸的是,
Numpy
无法知道它指向的缓冲区已关闭,因此它将继续尝试访问它之前指向的相同内存。 Windows 称之为“访问冲突”,其他人称之为“分段错误”。
解决这个问题:
del shm_array
和 del y_array
。这会将它们从模块范围中删除,以便 IPython 内核不会尝试访问它们。unlink
或计算机重新启动时才会删除该文件。