如何在多个进程之间共享实例变量

问题描述 投票:0回答:2

我定义和使用了一个类,如下所示,其中包括在多处理池上下文中调用的方法:

from multiprocessing import Pool

class MyClass:
    def __init__(self, big_object):
        self.big_object = big_object

    def compute_one(self, myvar):
        return self.big_object.do_something_with(myvar)

    def compute_all(self, myvars):
        with Pool() as pool:
            results = pool.map(compute_one, myvars)
        return results


big_object = pd.read_csv('very_big_file.csv')
cls = MyClass(big_object)

vars = ['foo', 'bar', 'baz']
all_results = cls.compute_all(vars)

这个“有效”,但问题是

big_object
在RAM中占用了几Gb,并且使用上面的代码,对于
multiprocessing.Pool
启动的每个进程,这个大对象都会复制到RAM中。

如何修改上面的代码,以便

big_object
在所有进程之间共享,但仍然可以在类实例化时定义?

--编辑

我正在尝试做的事情的一些背景可能有助于确定完全不同的方法。

这里,

big_object
是一个 1M+ 行的 Pandas 数据框,有数十列。并且
compute_one
根据特定列计算统计数据。

简化的高级视图将是(极其概括):

  • 选择其中一列(例如
    current_col = manufacturer
  • 对于其余分类列中的每个
    rem_col
    • 计算
      big_object.groupby([current_col, rem_col]).size()

最终结果如下:

制造商 国家_我们 country_gb 客户_男 客户_女性
宝马 15 18 74 13
奔驰 2 24 12 17
捷豹 48 102 23 22

因此本质上,这是关于计算整个源数据帧中每一列、每个其他列的统计数据。

对于给定的

current_col
,此处使用多处理可以并行计算
remaining_cols
上的所有统计数据。这些统计数据中不仅有
sum
,还有
mean
(对于剩余的数字列)。

使用全局变量(从类外部实例化的全局

big_object
)的肮脏方法,将整个运行时间从 5 个多小时缩短到大约 20 分钟。我唯一的问题是我想避免这种全局对象方法。

python class multiprocessing instance python-3.8
2个回答
1
投票

一种解决方案是使

MyClass
成为 托管 类,就像使用
multiprocessor.Manager().dict()
创建的托管字典一样。为了确保“大对象”只有一份副本,首先我将修改
MyClass.__init__
以采用 CSV 文件路径参数。这样,“大对象”仅在管理者的过程中构建。其次,我将从
compute_all
中删除
MyClass
逻辑,并调用
multiprocessing.pool.Pool.map
方法,使得作为工作函数传递的内容是托管对象的代理。

你节省的空间,你会放弃一些性能,因为每次调用方法

compute_one
都会或多或少相当于对实际执行的管理器进程的远程方法调用。

from multiprocessing import Pool
from multiprocessing.managers import BaseManager
import pandas as pd

class MyClassManager(BaseManager):
    pass

class MyClass:
    def __init__(self, path):
        self.big_object = pd.read_csv(path)

    def compute_one(self, myvar):
        # For demo purposes just check if column myvar exists in dataframe:
        return myvar in self.big_object

# Required for Windows:
if __name__ == '__main__':
    MyClassManager.register('MyClass', MyClass)
    with MyClassManager() as manager:
        cls = manager.MyClass('very_big_file.csv')

        # vars is a built-in function name and should not be used:
        my_vars = ['foo', 'bar', 'baz']
        with Pool() as pool:
            all_results = pool.map(cls.compute_one, my_vars)
        print(all_results)

0
投票

对之前回复的一个小补充。

@Booboo 提供的解决方案对我有用。但是,根据“with...manager”块末尾的数据 manager.shutdown() 的大小,由于某种原因会导致权限错误。

该错误似乎只出现在非常大的 cls 对象上:当 cls 占用 5-10% RAM 时,没有错误,当它占用 80% RAM(在我的例子中约为 50GB)时,就会出现错误。

我无法清楚地定义这种行为的原因,但是在管理器关闭之前删除 cls 有助于避免它:

if __name__ == '__main__':

MyClassManager.register('MyClass', MyClass)
with MyClassManager() as manager:
    cls = manager.MyClass('very_big_file.csv')

    my_vars = ['foo', 'bar', 'baz']
    with Pool() as pool:
        all_results = pool.map(cls.compute_one, my_vars)

    del cls  # <------- added line

print(all_results)

以下是权限错误的回溯:

Traceback (most recent call last):

File "D:\folder\main.py", line 73, 
        in <module> with MyClassManager() as manager:
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 657, 
        in __exit__  self.shutdown()
File "C:\...\Python311\Lib\multiprocessing\util.py", line 224, 
        in __call__
res = self._callback(*self._args, **self._kwargs)
      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\...\Python311\Lib\multiprocessing\managers.py", line 681, 
        in _finalize_manager process.terminate()
File "C:\...\Python311\Lib\multiprocessing\process.py", line 133, 
        in terminate self._popen.terminate()
File "C:\...\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 124, 
        in terminate _winapi.TerminateProcess(int(self._handle), TERMINATE)

PermissionError: [WinError 5] Permission denied

Process finished with exit code 1
© www.soinside.com 2019 - 2024. All rights reserved.