我定义和使用了一个类,如下所示,其中包括在多处理池上下文中调用的方法:
from multiprocessing import Pool
class MyClass:
def __init__(self, big_object):
self.big_object = big_object
def compute_one(self, myvar):
return self.big_object.do_something_with(myvar)
def compute_all(self, myvars):
with Pool() as pool:
results = pool.map(compute_one, myvars)
return results
big_object = pd.read_csv('very_big_file.csv')
cls = MyClass(big_object)
vars = ['foo', 'bar', 'baz']
all_results = cls.compute_all(vars)
这个“有效”,但问题是
big_object
在RAM中占用了几Gb,并且使用上面的代码,对于multiprocessing.Pool
启动的每个进程,这个大对象都会复制到RAM中。
如何修改上面的代码,以便
big_object
在所有进程之间共享,但仍然可以在类实例化时定义?
--编辑
我正在尝试做的事情的一些背景可能有助于确定完全不同的方法。
这里,
big_object
是一个 1M+ 行的 Pandas 数据框,有数十列。并且 compute_one
根据特定列计算统计数据。
简化的高级视图将是(极其概括):
current_col = manufacturer
)rem_col
:
big_object.groupby([current_col, rem_col]).size()
最终结果如下:
制造商 | 国家_我们 | country_gb | 客户_男 | 客户_女性 |
---|---|---|---|---|
宝马 | 15 | 18 | 74 | 13 |
奔驰 | 2 | 24 | 12 | 17 |
捷豹 | 48 | 102 | 23 | 22 |
因此本质上,这是关于计算整个源数据帧中每一列、每个其他列的统计数据。
对于给定的
current_col
,此处使用多处理可以并行计算 remaining_cols
上的所有统计数据。这些统计数据中不仅有 sum
,还有 mean
(对于剩余的数字列)。
使用全局变量(从类外部实例化的全局
big_object
)的肮脏方法,将整个运行时间从 5 个多小时缩短到大约 20 分钟。我唯一的问题是我想避免这种全局对象方法。
一种解决方案是使
MyClass
成为 托管 类,就像使用 multiprocessor.Manager().dict()
创建的托管字典一样。为了确保“大对象”只有一份副本,首先我将修改 MyClass.__init__
以采用 CSV 文件路径参数。这样,“大对象”仅在管理者的过程中构建。其次,我将从 compute_all
中删除 MyClass
逻辑,并调用 multiprocessing.pool.Pool.map
方法,使得作为工作函数传递的内容是托管对象的代理。
你节省的空间,你会放弃一些性能,因为每次调用方法
compute_one
都会或多或少相当于对实际执行的管理器进程的远程方法调用。
from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import pandas as pd
class MyClassManager(BaseManager):
pass
class MyClass:
def __init__(self, path):
self.big_object = pd.read_csv(path)
def compute_one(self, myvar):
# For demo purposes just check if column myvar exists in dataframe:
return myvar in self.big_object
# Required for Windows:
if __name__ == '__main__':
MyClassManager.register('MyClass', MyClass)
with MyClassManager() as manager:
cls = manager.MyClass('very_big_file.csv')
# vars is a built-in function name and should not be used:
my_vars = ['foo', 'bar', 'baz']
with Pool() as pool:
all_results = pool.map(cls.compute_one, my_vars)
print(all_results)
对之前回复的一个小补充。
@Booboo 提供的解决方案对我有用。但是,根据“with...manager”块末尾的数据 manager.shutdown() 的大小,由于某种原因会导致权限错误。
该错误似乎只出现在非常大的 cls 对象上:当 cls 占用 5-10% RAM 时,没有错误,当它占用 80% RAM(在我的例子中约为 50GB)时,就会出现错误。
我无法清楚地定义这种行为的原因,但是在管理器关闭之前删除 cls 有助于避免它:
if __name__ == '__main__':
MyClassManager.register('MyClass', MyClass)
with MyClassManager() as manager:
cls = manager.MyClass('very_big_file.csv')
my_vars = ['foo', 'bar', 'baz']
with Pool() as pool:
all_results = pool.map(cls.compute_one, my_vars)
del cls # <------- added line
print(all_results)
以下是权限错误的回溯:
Traceback (most recent call last):
File "D:\folder\main.py", line 73,
in <module> with MyClassManager() as manager:
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 657,
in __exit__ self.shutdown()
File "C:\...\Python311\Lib\multiprocessing\util.py", line 224,
in __call__
res = self._callback(*self._args, **self._kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 681,
in _finalize_manager process.terminate()
File "C:\...\Python311\Lib\multiprocessing\process.py", line 133,
in terminate self._popen.terminate()
File "C:\...\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 124,
in terminate _winapi.TerminateProcess(int(self._handle), TERMINATE)
PermissionError: [WinError 5] Permission denied
Process finished with exit code 1