我按以下方式并行运行代码:
grouped_data = Parallel(n_jobs=14)(delayed(function)(group) for group in grouped_data)
计算完成后,我可以在系统监视器中看到所有生成的进程仍然处于活动状态并且消耗内存:
并且所有这些进程都不会被杀死,直到主进程终止,这会导致内存泄漏。 如果我通过以下方式对 multiprocessing.Pool 执行相同操作:
pool = Pool(14)
pool.map(apply_wrapper, np.array_split(groups, 14))
pool.close()
pool.join()
然后我看到所有生成的处理最终都终止并且没有内存泄漏。 但是,我需要 joblib 并且它是不稳定的后端,因为它允许序列化一些本地函数。
如何强制终止 joblib.Parallel 生成的进程并释放内存? 我的环境如下:Python 3.8,Ubuntu Linux。
我自己调查后可以总结一下:
import psutil
current_process = psutil.Process()
subproc_before = set([p.pid for p in current_process.children(recursive=True)])
grouped_data = Parallel(n_jobs=14)(delayed(function)(group) for group in grouped_data)
subproc_after = set([p.pid for p in current_process.children(recursive=True)])
for subproc in subproc_after - subproc_before:
print('Killing process with pid {}'.format(subproc))
psutil.Process(subproc).terminate()
那么,考虑到 Иван Судос 答案的第 2 点,创建一个包含 LokyBackend 类的新类并重载 Terminate() 函数是否明智? 例如,
class MyLokyWrapper(LokyBackend):
def terminate(self):
if self._workers is not None:
self._workers.terminate(kill_workers=False)
#if kill_workers, joblib terminates "brutally" the remaining workers
#and their descendants using SIGKILL
self._workers = None
self.reset_batch_stats()
使用 Joblib 和 Loki 后端,你可以做到这一点:
from joblib.externals.loky import get_reusable_executor
get_reusable_executor().shutdown(wait=True)
或者您可以等待 5 分钟,泳池会自行关闭。