在类中使用多重处理

问题描述 投票:0回答:1

我有一个 Python 类,用于进行一系列计算,在其中初始化存储在嵌套字典中的许多不同的 pandas.DataFrame。初始化类的瓶颈如下: 我有一个跨越地球的规则点网格和一组也覆盖地球的多边形(代表构造板块)。对于网格中的每个点,我需要找到它们位于哪个板中,并将该板的 ID 分配给列表。 因为这相对较慢(即每次迭代需要 10 秒而不是 <1s), I would like to parallelise this section (and only this section!) of the class. I am new to parallelisation, however, and I ran into some issues.

这是处理查找plateID的函数。

def process_plateIDs(
        geometries_data: list,
        lats_chunk: _numpy.array,
        lons_chunk: _numpy.array,
    ) -> list:
    plateIDs = _numpy.zeros(len(lats_chunk))
    
    for topology_geometry, topology_plateID in geometries_data:
        mask = topology_geometry.contains(_geopandas.points_from_xy(lons_chunk, lats_chunk))
        plateIDs[mask] = topology_plateID

    return plateIDs

我简化了最初存储在 geopandas.DataFrame 中的板几何形状以减少计算时间:

def extract_geometry_data(topology_geometries):
    return [(geom, plateID) for geom, plateID in zip(topology_geometries.geometry, topology_geometries.PLATEID1)]

注意每个实例topology_geometries都存储在主类self.topology_geometries的属性中,它是一个字典,包含不同年龄和情况下的构造板块的几何形状(因此self.topology_geometries[age][case]给出一个geopandas。数据框)。

我正在尝试通过将其从多处理提供给池来并行运行此部分:

# Use all available CPUs if num_workers is not specified
if num_workers is None:
   num_workers = _os.cpu_count()

# Split the data into chunks
chunk_size = len(lats) // num_workers
chunks = [(geometries_data.copy(), lats[i:i + chunk_size].copy(), lons[i:i + chunk_size].copy()) for i in range(0, len(lats), chunk_size)]

# Create a Pool of workers
with Pool(num_workers) as pool:
   # Map the process_chunk function to chunks
   results = pool.starmap(process_plateIDs, chunks)

# Concatenate results from all chunks
plateIDs = _numpy.concatenate(results)

但是,这会导致每个工作人员重新初始化整个类,这根本不是我的意图。我怀疑这是因为我传递的是视图而不是某些变量的单独副本,我试图通过制作每个变量的显式副本来防止这种情况发生(如上面的代码片段所示),但没有成功。

如何告诉 Python 仅显式并行运行 process_plateIDs,并保持其余代码的顺序?它应该在 jupyter 笔记本上运行。

非常感谢!

编辑: 附加信息:我在带有 M1 芯片的 MacOS 上运行此程序。

python parallel-processing multiprocessing
1个回答
0
投票

MacOS 使用

spawn
作为 multiprocessing.set_start_method 所以你需要你的脚本是这样的,请参阅 使用多处理时在 Windows 中强制使用 if name=="main"

import all_needed_modules

if __name__ == "__main__":
    do_actual_code()

这将阻止工作人员在导入您的脚本时运行代码。

第二个问题是传递大量数据,所有参数每次都会序列化和反序列化,因此每次调用具有大数据块的函数是不好的,而是使用全局和初始值设定项来设置它。

some_global_data = None
def set_global_data(data):
    global some_global_data
    some_global_data = data

...
...
#  much later, inside the "__main__" block
with Pool(num_workers, initializer=set_global_data, initargs=(geometries_data,)) as pool:
   # Map the process_chunk function to chunks
   results = pool.starmap(process_plateIDs, chunks)

在你的函数中只需使用这个全局变量,而不是每次都传递巨大的数据块。

最后是 jupyter 的问题....好吧,jupyter 在除了 Linux 之外的任何操作系统上对多处理的支持都非常差,因此为了能够在 Windows 或 MacOS 上使用多处理,你必须

  1. 将您的数据保存在 pickle 文件中。
  2. 制作一个可以通过终端运行的Python脚本
  3. 通过jupyter中的shell运行python脚本
    %run some_script.py some_args
  4. 让脚本将其输出保存在另一个 pickle 文件中。
  5. 从磁盘读取结果

这些步骤至少需要几秒钟......所以你不应该在 jupyter 中使用多重处理来执行小任务。

另一种选择是通过 Linux 容器运行脚本,以便 python 可以使用

fork
并且您可以直接在笔记本中运行脚本。

© www.soinside.com 2019 - 2024. All rights reserved.