在另一个Q + A(Can I perform dynamic cumsum of rows in pandas?)中,我就使用prange
关于此代码(this answer)的正确性发表了评论:
from numba import njit, prange
@njit
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
评论是:
我不建议并行化一个不纯的循环。在这种情况下,
running
变量使其不纯。有4种可能的结果:(1)numba决定它不能并行化它只是处理循环,好像它是cumsum
而不是prange
(2)它可以将变量提升到循环之外并在余数上使用并行化(3)numba错误地在并行执行之间插入同步,结果可能是伪造的(4)numba在运行时插入必要的同步,这可能会产生比通过并行化首先获得更多的开销
而后来的补充:
当然,
running
和cumsum
变量都会使循环“不纯”,而不仅仅是前面评论中所述的运行变量
然后我被问到:
这可能听起来像一个愚蠢的问题,但我怎么能弄清楚它做了哪4件事并改进了呢?我真的想用numba变得更好!
鉴于它可能对未来的读者有用,我决定在这里创建一个自我回答的Q + A.掠夺者:我无法真正回答4个结果中的哪一个产生的问题(或者如果numba产生完全不同的结果),所以我非常鼓励其他答案。
TL; DR:第一:prange
与range
相同,除非你加入jit
平行,例如njit(parallel=True)
。如果你试试,你会看到一个关于“不支持的减少”的例外 - 这是因为Numba将prange
的范围限制为“纯”循环和“不纯循环”,并且支持numba支持的减少,并确保它的责任进入用户的这两个类别中的任何一个。
这在numbas prange
(version 0.42)的文档中有明确说明:
1.10.2。显式并行循环
此代码转换过程的另一个特性是支持显式并行循环。可以使用Numba的
prange
而不是range
来指定循环可以并行化。除了支持的减少之外,用户需要确保循环没有交叉迭代依赖性。
评论中称为“不纯”的内容在该文档中称为“交叉迭代依赖”。这种“交叉迭代依赖性”是在循环之间变化的变量。一个简单的例子是:
def func(n):
a = 0
for i in range(n):
a += 1
return a
这里变量a
取决于它在循环开始之前的值以及循环执行了多少次迭代。这就是“交叉迭代依赖”或“不纯”循环的含义。
显式并行化这种循环时的问题是迭代是并行执行的,但每次迭代都需要知道其他迭代正在做什么。如果不这样做会导致错误的结果。
让我们暂时假设prange
会产生4个工人,我们将4
作为n
传递给函数。一个完全天真的实现会做什么?
Worker 1 starts, gets a i = 1 from `prange`, and reads a = 0
Worker 2 starts, gets a i = 2 from `prange`, and reads a = 0
Worker 3 starts, gets a i = 3 from `prange`, and reads a = 0
Worker 1 executed the loop and sets `a = a + 1` (=> 1)
Worker 3 executed the loop and sets `a = a + 1` (=> 1)
Worker 4 starts, gets a i = 4 from `prange`, and reads a = 2
Worker 2 executed the loop and sets `a = a + 1` (=> 1)
Worker 4 executed the loop and sets `a = a + 1` (=> 3)
=> Loop ended, function return 3
不同的工作人员读取,执行和写入a
的顺序可以是任意的,这只是一个例子。它也可能(偶然)产生正确的结果!这通常被称为Race condition。
更复杂的prange
会认识到存在这样的交叉迭代依赖性会是什么?
有三种选择:
鉴于我对numba文档的理解(再次重复):
除了支持的减少之外,用户需要确保循环没有交叉迭代依赖性。
Numba:
不幸的是,目前还不清楚“支持减少”是什么。但是文档暗示它是循环体中前一个值的二元运算符:
如果变量由二元函数/运算符使用其在循环体中的先前值更新,则自动推断减少。对于
+=
和*=
运营商,自动推断出减少的初始值。对于其他函数/运算符,reduce变量应在进入prange
循环之前保持标识值。对于标量和任意维度的数组,支持以这种方式减少。
OP中的代码使用列表作为交叉迭代依赖项,并在循环体中调用list.append
。我个人不会将list.append
称为减少而且它不使用二元运算符,所以我的假设是它很可能不受支持。至于其他交叉迭代依赖性running
:它在前一次迭代的结果上使用加法(这很好)但如果它超过阈值(可能不是很好)也有条件地将其重置为零。
Numba提供了检查中间代码(LLVM和ASM)代码的方法:
dynamic_cumsum.inspect_types()
dynamic_cumsum.inspect_llvm()
dynamic_cumsum.inspect_asm()
但即使我对结果有必要的理解,也无法对发出的代码的正确性做出任何陈述 - 通常,“证明”多线程/进程代码正常工作是非常不重要的。鉴于我甚至缺乏LLVM和ASM知识,即使它甚至试图并行化它,我实际上也无法回答你的具体问题。
回到代码,正如所提到的,如果我使用parallel=True
它会引发异常(不支持的缩减),所以我假设numba不会并行化示例中的任何内容:
from numba import njit, prange
@njit(parallel=True)
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
dynamic_cumsum(np.ones(100), np.arange(100), 10)
AssertionError: Invalid reduction format During handling of the above exception, another exception occurred: LoweringError: Failed in nopython mode pipeline (step: nopython mode backend) Invalid reduction format File "<>", line 7: def dynamic_cumsum(seq, index, max_value): <source elided> running = 0 for i in prange(len(seq)): ^ [1] During: lowering "id=2[LoopNest(index_variable = parfor_index.192, range = (0, seq_size0.189, 1))]{56: <ir.Block at <> (10)>, 24: <ir.Block at <> (7)>, 34: <ir.Block at <> (8)>}Var(parfor_index.192, <> (7))" at <> (7)
那么还有什么可说的:在这种情况下,prange
在正常的range
上没有提供任何速度优势(因为它没有并行执行)。因此,在这种情况下,我不会“冒险”潜在的问题和/或使读者感到困惑 - 因为根据numba文档,它不受支持。
from numba import njit, prange
@njit
def p_dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in prange(len(seq)):
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
@njit
def dynamic_cumsum(seq, index, max_value):
cumsum = []
running = 0
for i in range(len(seq)): # <-- here is the only change
if running > max_value:
cumsum.append([index[i], running])
running = 0
running += seq[i]
cumsum.append([index[-1], running])
return cumsum
只是一个快速的时间,支持我之前做的“不快于”的声明:
import numpy as np
seq = np.random.randint(0, 100, 10_000_000)
index = np.arange(10_000_000)
max_ = 500
# Correctness and warm-up
assert p_dynamic_cumsum(seq, index, max_) == dynamic_cumsum(seq, index, max_)
%timeit p_dynamic_cumsum(seq, index, max_)
# 468 ms ± 12.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit dynamic_cumsum(seq, index, max_)
# 470 ms ± 9.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)