将多处理池与采用数组列表的函数结合使用

问题描述 投票:0回答:1

我正在尝试编写一个函数来并行读取大量文件。我的代码如下:

import numpy as np
from multiprocessing import Pool
from functools import partial

def read_profiles(stamp,name,cols,*args):
   #This function reads each file.
   filename=name + '-' + str(int(timestep[stamp])) + '.dat'
   with open(filename) as f:
      xloc = 0
      for line in f:
         ele = line.rstrip("\n").split()
         for g in len(args):
            args[g][stamp,xloc] = float(ele[cols[g]])
         xloc = xloc + 1

timestep = np.arange(1,51)
x = np.ndarray(shape=(len(timestep),1001))
Ex = np.ndarray(shape=(len(timestep),1001))
j1 = np.ndarray(shape=(len(timestep),1001))
j2 = np.ndarray(shape=(len(timestep),1001))
j3 = np.ndarray(shape=(len(timestep),1001))
j4 = np.ndarray(shape=(len(timestep),1001))

terse_args = [x,Ex]
curr_args = [j1,j2,j3,j4]

with Pool(4) as pool:
   pool.map(partial(read_profiles,name='terse',cols=[0,2],*args=*terse_args),range(len(timestep)))
   pool.map(partial(read_profiles,name='current',cols=[1,2,3,4],*args=*curr_args),range(len(timestep)))

请注意,最后一个参数 (*args) 采用未知数量的二维数组。 上面的代码给我一个错误,指出 *args 处的“无效语法”。我尝试将它们作为不带关键字的位置参数传递,但随后出现错误,指出“名称”有多个值。

有谁知道如何在使用 pool.map 和partial 时包含任意数量的二维数组作为函数的参数?

如果需要任何其他信息,请告诉我。 谢谢。

python python-3.x numpy multiprocessing
1个回答
0
投票

一种可能的解决方案是创建一个自定义部分(可以是具有

__call__
魔术方法的类),例如:

from functools import partial
from multiprocessing import Pool

import numpy as np


def read_profiles(stamp, name, cols, *args):
    # This function reads each file.
    filename = name + "-" + str(int(timestep[stamp])) + ".dat"
    print(f"Opening {filename=}")


class MyPartial:
    def __init__(self, name, cols, *args):
        self.name = name
        self.cols = cols
        self.args = args

    def __call__(self, stamp):
        return read_profiles(
            stamp,
            self.name,
            self.cols,
            *self.args,
        )


if __name__ == "__main__":
    timestep = np.arange(1, 51)
    x = np.ndarray(shape=(len(timestep), 1001))
    Ex = np.ndarray(shape=(len(timestep), 1001))
    j1 = np.ndarray(shape=(len(timestep), 1001))
    j2 = np.ndarray(shape=(len(timestep), 1001))
    j3 = np.ndarray(shape=(len(timestep), 1001))
    j4 = np.ndarray(shape=(len(timestep), 1001))

    terse_args = [x, Ex]
    curr_args = [j1, j2, j3, j4]

    with Pool(4) as pool:
        pool.map(
            MyPartial("terse", [0, 2], *terse_args),
            range(len(timestep)),
        )

        pool.map(
            MyPartial("current", [1, 2, 3, 4], *curr_args),
            range(len(timestep)),
        )

打印:

...

Opening filename='current-46.dat'
Opening filename='current-47.dat'
Opening filename='current-48.dat'
Opening filename='current-49.dat'
Opening filename='current-50.dat'
© www.soinside.com 2019 - 2024. All rights reserved.