我正在做一些文件解析,这是一个 CPU 绑定任务。无论我在这个过程中扔多少文件,它使用的 RAM 都不超过大约 50MB。 该任务是可并行的,我已将其设置为使用下面的并发期货将每个文件解析为一个单独的进程:
from concurrent import futures
with futures.ProcessPoolExecutor(max_workers=6) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it.
# The results of can come back in any order.
for this_file in files_list:
job = executor.submit(parse_function, this_file, **parser_variables)
jobs[job] = this_file
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
results_list = job.result()
this_file = jobs[job]
# delete the result from the dict as we don't need to store it.
del jobs[job]
# post-processing (putting the results into a database)
post_process(this_file, results_list)
问题是,当我使用 futures 运行它时,RAM 使用率飙升,不久我就用完了,Python 崩溃了。这可能在很大程度上是因为 parse_function 的结果大小为几 MB。一旦结果通过
post_processing
,应用程序就不再需要它们了。如您所见,我正在尝试 del jobs[job]
清除 jobs
中的项目,但这没有任何区别,内存使用量保持不变,并且似乎以相同的速度增加。
我也确认这不是因为它只使用一个进程等待
post_process
函数,加上 time.sleep(1)
.
futures 文档中没有关于内存管理的任何内容,虽然简短的搜索表明它在 futures 的实际应用程序中出现过(Python 循环中的清除内存 和 http://grokbase.com/t/python /python-list/1458ss5etz/real-world-use-of-concurrent-futures) - 答案不会转化为我的用例(它们都与超时等有关)。
那么,如何在不耗尽 RAM 的情况下使用 Concurrent futures? (蟒蛇 3.5)
我来试试(可能猜错了...)
您可能需要一点一点地提交您的工作,因为在每次提交时您都在制作 parser_variables 的副本,这可能最终会占用您的 RAM。
这是带有“<----" on the interesting parts
”的工作代码with futures.ProcessPoolExecutor(max_workers=6) as executor:
# A dictionary which will contain a list the future info in the key, and the filename in the value
jobs = {}
# Loop through the files, and run the parse function for each file, sending the file-name to it.
# The results of can come back in any order.
files_left = len(files_list) #<----
files_iter = iter(files_list) #<------
while files_left:
for this_file in files_iter:
job = executor.submit(parse_function, this_file, **parser_variables)
jobs[job] = this_file
if len(jobs) > MAX_JOBS_IN_QUEUE:
break #limit the job submission for now job
# Get the completed jobs whenever they are done
for job in futures.as_completed(jobs):
files_left -= 1 #one down - many to go... <---
# Send the result of the file the job is based on (jobs[job]) and the job (job.result)
results_list = job.result()
this_file = jobs[job]
# delete the result from the dict as we don't need to store it.
del jobs[job]
# post-processing (putting the results into a database)
post_process(this_file, results_list)
break; #give a chance to add more jobs <-----
尝试将
del
添加到您的代码中,如下所示:
for job in futures.as_completed(jobs):
del jobs[job] # or `val = jobs.pop(job)`
# del job # or `job._result = None`
查看
concurrent.futures.as_completed()
函数,我了解到它足以确保不再有任何对未来的引用。如果你一得到结果就分配这个引用,你将最大限度地减少内存使用。
我使用生成器表达式来存储我的
Future
实例,因为我关心的所有内容都已经由 future 在其结果中返回(基本上,已调度工作的状态)。其他实现使用 dict
例如在你的情况下,因为你不返回输入文件名作为线程工作者结果的一部分。
使用生成器表达式意味着一旦产生结果,就不再有任何对
Future
的引用。在内部,as_completed()
在将完成的 Future
交给您后,已经负责删除自己的引用。
futures = (executor.submit(thread_worker, work) for work in workload)
for future in concurrent.futures.as_completed(futures):
output = future.result()
... # on next loop iteration, garbage will be collected for the result data, too
编辑:从使用
set
和删除条目简化为简单地使用生成器表达式。
我也有同样的问题。
就我而言,我需要启动数百万个线程。对于 python2,我会使用 dict 自己编写一个线程池。但是在 python3 中,当我动态删除完成的线程时,我遇到了以下错误:
RuntimeError: dictionary changed size during iteration
所以我必须使用concurrent.futures,一开始我是这样写的:
from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
all_resouces = get_all_resouces()
with ThreadPoolExecutor(max_workers=50) as pool:
for r in all_resouces:
pool.submit(handle_resource, *args)
但是很快内存就耗尽了,因为内存只有在所有线程执行完后才会释放。我需要在许多线程开始之前删除完成的线程。所以我在这里阅读了文档:https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures
发现Executor.shutdown(wait=True)可能是我需要的。 这是我的最终解决方案:
from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
all_resouces = get_all_resouces()
i = 0
while i < len(all_resouces):
with ThreadPoolExecutor(max_workers=50) as pool:
for r in all_resouces[i:i+1000]:
pool.submit(handle_resource, *args)
i += 1000
如果使用 with 语句,您可以避免显式调用此方法,这将关闭 Executor(等待,就像调用 Executor.shutdown() 时等待设置为 True)。
更新:
刚刚找到更好的解决方案:
futures: Set[Future] = set()
with ThreadPoolExecutor(max_workers) as thread_pool:
for resouce in list/set/iterator/generator:
if len(futures) >= 1000:
"""
release a completed future when more than 1000 futures created, then submit(create) a new one.
this will prevent memory exhausted when millions of futures needed
"""
completed_future = next(as_completed(futures))
futures.remove(completed_future)
future = thread_pool.submit(resouce_handler, args)
futures.add(future)