我的应用程序是在内存中提取zip文件列表并将数据写入临时文件。然后我将内存映射到临时文件中的数据,以便在另一个函数中使用。当我在一个进程中执行此操作时,它工作正常,读取数据不会影响内存,最大RAM大约为40MB。但是,当我使用concurrent.futures执行此操作时,RAM最高可达500MB。
我看过this的例子,我知道我可以以更好的方式提交作业,以便在处理过程中节省内存。但我不认为我的问题是相关的,因为我在处理过程中没有耗尽内存。我不明白的问题是,即使在返回内存映射之后,它仍然保留在内存中。我也不了解内存中的内容,因为在单个进程中执行此操作不会将数据加载到内存中。
任何人都可以解释内存中的实际内容以及为什么单一和并行处理之间存在差异?
PS我使用memory_profiler
来测量内存使用情况
def main():
datadir = './testdata'
files = os.listdir('./testdata')
files = [os.path.join(datadir, f) for f in files]
datalist = download_files(files, multiprocess=False)
print(len(datalist))
time.sleep(15)
del datalist # See here that memory is freed up
time.sleep(15)
def download_files(filelist, multiprocess=False):
datalist = []
if multiprocess:
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
returned_future = [executor.submit(extract_file, f) for f in filelist]
for future in returned_future:
datalist.append(future.result())
else:
for f in filelist:
datalist.append(extract_file(f))
return datalist
def extract_file(input_zip):
buffer = next(iter(extract_zip(input_zip).values()))
with tempfile.NamedTemporaryFile() as temp_logfile:
temp_logfile.write(buffer)
del buffer
data = memmap(temp_logfile, dtype='float32', shape=(2000000, 4), mode='r')
return data
def extract_zip(input_zip):
with ZipFile(input_zip, 'r') as input_zip:
return {name: input_zip.read(name) for name in input_zip.namelist()}
我无法分享我的实际数据,但这里有一些简单的代码来创建证明问题的文件:
for i in range(1, 16):
outdir = './testdata'
outfile = 'file_{}.dat'.format(i)
fp = np.memmap(os.path.join(outdir, outfile), dtype='float32', mode='w+', shape=(2000000, 4))
fp[:] = np.random.rand(*fp.shape)
del fp
with ZipFile(outdir + '/' + outfile[:-4] + '.zip', mode='w', compression=ZIP_DEFLATED) as z:
z.write(outdir + '/' + outfile, outfile)
问题是你试图在进程之间传递np.memmap
,但这不起作用。
最简单的解决方案是传递文件名,并让子进程memmap
为同一个文件。
当你pass an argument to a child process or pool method via multiprocessing
,或从一个返回一个值(包括通过ProcessPoolExecutor
间接这样做)时,它的工作原理是调用pickle.dumps
值,将pickle传递给进程(细节有所不同,但无论是Pipe
还是Queue
都没关系一个memmap
或其他东西),然后在另一边取消结果。
mmap
基本上只是一个ndarray
对象,在mmap
ped内存中分配了mmap
。
并且Python不知道如何挑选PicklingError
对象。 (如果你尝试,你会得到一个BrokenProcessPool
或np.memmap
错误,具体取决于你的Python版本。)
一个np.ndarray
可以被腌制,因为它只是data._mmap
的一个子类 - 但是酸洗和去除它实际上复制了数据并为你提供了一个简单的内存数组。 (如果你看看None
,它是dill
。)如果它给你一个错误而不是默默地复制你的所有数据(腌渍替换库TypeError: can't pickle mmap.mmap objects
就是这样做:mmap
)可能会更好,但事实并非如此。
在进程之间传递底层文件描述符并非不可能 - 每个平台上的细节都不同,但所有主要平台都有办法实现。然后你可以使用传递的fd在接收端构建一个memmap
,然后构建一个np.memmap
。你甚至可以将它包装在dill
的子类中。但我怀疑如果这不是有点困难,有人会已经做到了,事实上它可能已经是numpy
的一部分,如果不是shared memory features of multiprocessing
本身。
另一种方法是明确使用mmap
,并将数组分配给共享内存而不是memmap
。
但最简单的解决方案是,正如我在顶部所说,只是传递文件名而不是对象,并让每一方NamedTemporaryFile
相同的文件。不幸的是,这确实意味着你不能只使用一个关闭时删除qazxswpoi(虽然你使用它的方式已经是不可移植的,并且不会像在Unix上那样在Windows上工作),但是改变这一点仍然可能比其他替代方案更少。