可能重复:
如何并行化一个简单的Python循环?
我对 Python 很陌生(使用 Python 3.2),我有一个关于并行化的问题。我有一个 for 循环,我希望使用 Python 3.2 中的“多处理”并行执行:
def computation:
global output
for x in range(i,j):
localResult = ... #perform some computation as a function of i and j
output.append(localResult)
总的来说,我想在 i=0 到 j=100 的范围内执行此计算。因此,我想创建多个进程,每个进程都使用总范围的子域调用函数“计算”。有什么想法可以做到这一点吗?有比使用多处理更好的方法吗?
更具体地说,我想执行域分解,我有以下代码:
from multiprocessing import Pool
class testModule:
def __init__(self):
self
def computation(self, args):
start, end = args
print('start: ', start, ' end: ', end)
testMod = testModule()
length = 100
np=4
p = Pool(processes=np)
p.map(yes tMod.computation, [(length, startPosition, length//np) for startPosition in range(0, length, length//np)])
我收到一条提到 PicklingError 的错误消息。你知道这里可能有什么问题吗?
Joblib 专为环绕多处理而设计,以实现简单的并行循环。 我建议使用它而不是直接处理多重处理。
简单的情况看起来像这样:
from joblib import Parallel, delayed
Parallel(n_jobs=2)(delayed(foo)(i**2) for i in range(10)) # n_jobs = number of processes
一旦你理解了它,语法就很简单。 我们使用生成器语法,其中
delayed
用于调用函数 foo
,其参数包含在后面的括号中。
在您的情况下,您应该使用生成器语法重写 for 循环,或者定义另一个函数(即“worker”函数)来执行单个循环迭代的操作,并将其放入 Parallel 调用的生成器语法中。
在后一种情况下,你会这样做:
Parallel(n_jobs=2)(delayed(foo)(parameters) for x in range(i,j))
其中
foo
是您定义的用于处理 for 循环体的函数。 请注意,您不想追加到列表,因为 Parallel 无论如何都会返回一个列表。
在这种情况下,您可能想要定义一个简单的函数来执行计算并得到
localResult
。
def getLocalResult(args):
""" Do whatever you want in this func.
The point is that it takes x,i,j and
returns localResult
"""
x,i,j = args #unpack args
return doSomething(x,i,j)
现在在您的计算函数中,您只需创建一个工作池并映射本地结果:
import multiprocessing
def computation(np=4):
""" np is number of processes to fork """
p = multiprocessing.Pool(np)
output = p.map(getLocalResults, [(x,i,j) for x in range(i,j)] )
return output
我在这里删除了全局变量,因为它是不必要的(全局变量通常是不必要的)。 在您的通话例程中,您应该只执行
output.extend(computation(np=4))
或类似的操作。
编辑
这是您的代码的“工作”示例:
from multiprocessing import Pool
def computation(args):
length, startPosition, npoints = args
print(args)
length = 100
np=4
p = Pool(processes=np)
p.map(computation, [(startPosition,startPosition+length//np, length//np) for startPosition in range(0, length, length//np)])
请注意,您所做的不起作用,因为您使用实例方法作为函数。 multiprocessing 启动新进程并通过
pickle
在进程之间发送信息,因此,只能使用可以 pickle 的对象。 请注意,无论如何使用实例方法确实没有意义。 每个进程都是父进程的副本,因此进程中发生的任何状态更改都不会传播回父进程。