python中二维数组运算的有效并行化运算

问题描述 投票:2回答:1

[我正在尝试使用python中的joblib库并行处理二维数组上的操作。这是我的代码

from joblib import Parallel, delayed
import multiprocessing
import numpy as np

# The code below just aggregates the base_array to form a new two dimensional array
base_array = np.ones((2**12, 2**12), dtype=np.uint8)
def compute_average(i, j):
    return np.uint8(np.mean(base_array[i*4: (i+1)*4, j*4: (j+1)*4]))

num_cores = multiprocessing.cpu_count()
new_array = np.array(Parallel(n_jobs=num_cores)(delayed(compute_average)(i, j) 
                                        for i in xrange(0,1024) for j in xrange(0,1024)), dtype=np.uint8)

上面的代码比下面的基本嵌套for循环花费更多的时间。

new_array_nested = np.ones((2**10, 2**10), dtype=np.uint8)
for i in xrange(0,1024):
    for j in xrange(0,1024):
         new_array_nested[i,j] = compute_average(i,j)

为什么并行操作需要更多时间?如何提高上述代码的效率?

python arrays parallel-processing joblib parallel-for
1个回答
4
投票

哇!绝对喜欢您的代码。它的工作原理很吸引人[将总效率提高了400倍。我将尝试阅读有关numba和jit编译器的更多信息,但是您可以简要地写一下为什么它如此高效。再次感谢您提供的所有帮助! –Ram18年1月3日在20:30

我们可以很容易地到达77 [ms]下]的某个地方,但是需要掌握一些步骤才能到达那里,所以让我们开始:


问:为什么并行操作需要更多时间?

因为与

joblib一起提议的步骤会创建许多完整的过程副本-以便逃避GIL步进的pure-[SERIAL]跳舞(一个接一个),但是(!)包括添加在开始对“”执行第一步之前,所有变量以及整个python解释器及其内部状态的所有内存传输的开销(非常昂贵/对确实很大的[[SERIAL]

数组敏感)您的“有效负载”计算策略上的“有用”工作,所以所有这些实例化开销的总和很容易变得比反比例numpy因子的开销无关的期望大,设置1 / N的位置。N ~ num_cores

问:可以帮助提高上述代码的效率吗?


尽可能节省]在所有间接费用上:- 在可能的情况:-

在进程生成侧,尝试使用For details, read the mathematical formulation in the tail part of Amdahl's Law re-formulation here.为“主”进程腾出更多空间,并在性能提高时进行基准测试-在流程终止端,避免从返回值中收集和构造一个新的(可能是较大的)对象,而是预先分配了足够大的本地流程数据结构,并返回了一些有效的对象,进行了序列化,以方便且无阻碍地合并各部分返回的结果的对齐方式。

[这两个“隐藏”成本都是您的主要设计敌人,因为它们线性地添加到整个问题解决方案(n_jobs = ( num_cores - 1 ))的计算路径的纯-[SERIAL]部分中]

实验和结果:


ref.: the effects of both of these in the overhead-strict Amdahl's Law formula

每一张镜头大约需要
>>> from zmq import Stopwatch; aClk = Stopwatch()
>>> base_array = np.ones( (2**12, 2**12), dtype = np.uint8 )
>>> base_array.flags
  C_CONTIGUOUS : True
  F_CONTIGUOUS : False
  OWNDATA      : True
  WRITEABLE    : True
  ALIGNED      : True
  UPDATEIFCOPY : False
>>> def compute_average_per_TILE(               TILE_i,   TILE_j ): // NAIVE MODE
...     return np.uint8( np.mean( base_array[ 4*TILE_i:4*(TILE_i+1),
...                                           4*TILE_j:4*(TILE_j+1)
...                                           ]
...                               )
...                      )
... 
>>> aClk.start(); _ = compute_average_per_TILE( 12,13 ); aClk.stop()
25110
  102
  109
   93
。期望大约93 [us]涵盖整个1024*1024*93 ~ 97,517,568 [us]的均值处理。

实验上,这里可以很好地看到处理不当的开销所产生的影响,朴素的嵌套实验采用了这种方法:base_array

大约减少了3.7倍(由于未产生“尾部”部分(分配各个返回值)的开销为2 ** 20倍,但在终端分配处仅为一次。]

但是,还会有更多惊喜。

这里什么是合适的工具?


从来没有通用的规则,没有一个万能的。

给出

每次调用将不仅仅有一个4x4矩阵块(每个提议的

>>> aClk.start(); _ = [ compute_average_per_TILE( i, j ) for i in xrange(1024) for j in xrange(1024) ]; aClk.stop() 26310594 ^^...... 26310594 / 1024. / 1024. == 25.09 [us/cell]

精心策划的25 [us]调用产生的实际[小于joblib,分布在〜< [2**20由原始提案完全实例化的过程.cpu_count()确实存在提高性能的空间。对于这些小型矩阵(并不是所有问题在这个意义上都这么高兴),人们可以从更智能的内存访问模式以及减少python GIL起源的弱点中获得最佳结果。
由于每次通话时间跨度仅为4x4微型计算,因此更好的方法是利用智能矢量化(所有数据都适合缓存,因此缓存内计算是寻求最大性能的假日之旅)

最好的(仍然是非常天真的矢量化代码)能够从...( joblib.Parallel( n_jobs = num_cores )( joblib.delayed( compute_average )( i, j ) for i in xrange( 1024 ) for j in xrange( 1024 ) ) 变为

小于~ 25 [us/cell]

(还有空间可以进行更好的对齐处理,因为它花费了[[~ 74 [ns/cell] / ~ 4.6 [ns]单元格处理)),因此可以期待如果可以对缓存中优化的矢量化代码进行适当设计,则可以进一步提高速度。

base_array中?!值得做对,不是吗?

不是97秒,不是25秒但小于77 [ms]仅需敲击几下键盘,如果可以更好地优化呼叫签名,就可以压缩更多的东西:77 [ms]

© www.soinside.com 2019 - 2024. All rights reserved.