以下代码并行化了一个 for 循环。
import networkx as nx;
import numpy as np;
from joblib import Parallel, delayed;
import multiprocessing;
def core_func(repeat_index, G, numpy_arrary_2D):
for u in G.nodes():
numpy_arrary_2D[repeat_index][u] = 2;
return;
if __name__ == "__main__":
G = nx.erdos_renyi_graph(100000,0.99);
nRepeat = 5000;
numpy_array = np.zeros([nRepeat,G.number_of_nodes()]);
Parallel(n_jobs=4)(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat));
print(np.mean(numpy_array));
可以看到,要打印的预期值为 2。但是,当我在集群(多核、共享内存)上运行代码时,它返回 0.0。
我认为问题在于每个工作人员都创建了自己的
numpy_array
对象副本,而在 main 函数中创建的副本没有更新。如何修改代码以便更新 numpy 数组numpy_array
?
joblib
默认使用processes的多处理池,如其手册所述:
在底层,Parallel 对象创建了一个多处理池 在多个进程中分叉 Python 解释器来执行每个进程 列表中的项目。延迟函数是一个简单的技巧 能够通过函数调用创建元组(函数、args、kwargs) 语法。
这意味着每个进程都会继承数组的原始状态,但是当进程退出时,它写入其中的任何内容都会丢失。仅函数结果被传递回调用(主)进程。但你没有返回任何东西,所以
None
被返回。
要使共享数组可修改,有两种方法:使用线程和使用共享内存。
与进程不同,线程共享内存。因此,您可以写入数组,每个作业都会看到此更改。根据
joblib
手册,是这样完成的:
Parallel(n_jobs=4, backend="threading")(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat));
运行时:
$ python r1.py
2.0
但是,当您将复杂的内容写入数组时,请确保正确处理数据或数据块周围的锁,否则您将遇到竞争条件(谷歌它)。
还要仔细阅读 GIL,因为 Python 中的计算多线程是有限的(与 I/O 多线程不同)。
如果您仍然需要进程(例如由于 GIL),您可以将该数组放入共享内存中。
这是一个有点复杂的主题,但是joblib + numpy共享内存示例也显示在
joblib
手册中。
正如 Sergey 在他的回答中所写,进程不共享状态和内存。这就是为什么您看不到预期答案的原因。
线程共享状态和内存空间,因为它们在同一进程下运行。如果您有很多 I/O 操作,这非常有用。由于 GIL
,它不会为您带来更多处理能力(更多 CPU)进程之间通信的一种技术是使用管理器的代理对象。您创建一个管理器对象,它在进程之间同步资源。
Manager() 返回的管理器对象控制一个服务器进程,该进程保存 Python 对象并允许其他进程使用代理来操作它们。
我还没有测试过这段代码(我没有你使用的所有模块),它可能需要对代码进行更多修改,但是使用 Manager 对象它应该看起来像这样
if __name__ == "__main__":
G = nx.erdos_renyi_graph(100000,0.99);
nRepeat = 5000;
manager = multiprocessing.Manager()
numpys = manager.list(np.zeros([nRepeat, G.number_of_nodes()])
Parallel(n_jobs=4)(delayed(core_func)(repeat_index, G, numpys, que) for repeat_index in range(nRepeat));
print(np.mean(numpys));
您不一定需要使用共享内存。
假设您的数组仅以零启动,您只需对结果求和即可:
core_array = [...]
results = joblib.Parallel(...)
for result in results:
core_array += result
在加法运算符正确重载的情况下,该解决方案也是通用的。