我有一个 Python 类,用于进行一系列计算,在其中初始化存储在嵌套字典中的许多不同的 pandas.DataFrame。初始化类的瓶颈如下: 我有一个跨越地球的规则点网格和一组也覆盖地球的多边形(代表构造板块)。对于网格中的每个点,我需要找到它们位于哪个板中,并将该板的 ID 分配给列表。 因为这相对较慢(即每次迭代需要 10 秒而不是 <1s), I would like to parallelise this section (and only this section!) of the class. I am new to parallelisation, however, and I ran into some issues.
这是处理查找plateID的函数。
def process_plateIDs(
geometries_data: list,
lats_chunk: _numpy.array,
lons_chunk: _numpy.array,
) -> list:
plateIDs = _numpy.zeros(len(lats_chunk))
for topology_geometry, topology_plateID in geometries_data:
mask = topology_geometry.contains(_geopandas.points_from_xy(lons_chunk, lats_chunk))
plateIDs[mask] = topology_plateID
return plateIDs
我简化了最初存储在 geopandas.DataFrame 中的板几何形状以减少计算时间:
def extract_geometry_data(topology_geometries):
return [(geom, plateID) for geom, plateID in zip(topology_geometries.geometry, topology_geometries.PLATEID1)]
注意每个实例topology_geometries都存储在主类self.topology_geometries的属性中,它是一个字典,包含不同年龄和情况下的构造板块的几何形状(因此self.topology_geometries[age][case]给出一个geopandas。数据框)。
我正在尝试通过将其从多处理提供给池来并行运行此部分:
# Use all available CPUs if num_workers is not specified
if num_workers is None:
num_workers = _os.cpu_count()
# Split the data into chunks
chunk_size = len(lats) // num_workers
chunks = [(geometries_data.copy(), lats[i:i + chunk_size].copy(), lons[i:i + chunk_size].copy()) for i in range(0, len(lats), chunk_size)]
# Create a Pool of workers
with Pool(num_workers) as pool:
# Map the process_chunk function to chunks
results = pool.starmap(process_plateIDs, chunks)
# Concatenate results from all chunks
plateIDs = _numpy.concatenate(results)
但是,这会导致每个工作人员重新初始化整个类,这根本不是我的意图。我怀疑这是因为我传递的是视图而不是某些变量的单独副本,我试图通过制作每个变量的显式副本来防止这种情况发生(如上面的代码片段所示),但没有成功。
如何告诉 Python 仅显式并行运行 process_plateIDs,并保持其余代码的顺序?它应该在 jupyter 笔记本上运行。
非常感谢!
编辑: 附加信息:我在带有 M1 芯片的 MacOS 上运行此程序。
MacOS 使用
spawn
作为 multiprocessing.set_start_method 所以你需要你的脚本是这样的,请参阅 使用多处理时在 Windows 中强制使用 if name=="main"
import all_needed_modules
if __name__ == "__main__":
do_actual_code()
这将阻止工作人员在导入您的脚本时运行代码。
第二个问题是传递大量数据,所有参数每次都会序列化和反序列化,因此每次调用具有大数据块的函数是不好的,而是使用全局和初始值设定项来设置它。
some_global_data = None
def set_global_data(data):
global some_global_data
some_global_data = data
...
...
# much later, inside the "__main__" block
with Pool(num_workers, initializer=set_global_data, initargs=(geometries_data,)) as pool:
# Map the process_chunk function to chunks
results = pool.starmap(process_plateIDs, chunks)
在你的函数中只需使用这个全局变量,而不是每次都传递巨大的数据块。
最后是 jupyter 的问题....好吧,jupyter 在除了 Linux 之外的任何操作系统上对多处理的支持都非常差,因此为了能够在 Windows 或 MacOS 上使用多处理,你必须
%run some_script.py some_args
这些步骤至少需要几秒钟......所以你不应该在 jupyter 中使用多重处理来执行小任务。
另一种选择是通过 Linux 容器运行脚本,以便 python 可以使用
fork
并且您可以直接在笔记本中运行脚本。