如何确定numba的prange实际上是否正常工作?

问题描述 投票:6回答:1

在另一个Q + A(Can I perform dynamic cumsum of rows in pandas?)中,我就使用prange关于此代码(this answer)的正确性发表了评论:

from numba import njit, prange

@njit
def dynamic_cumsum(seq, index, max_value):
    cumsum = []
    running = 0
    for i in prange(len(seq)):
        if running > max_value:
            cumsum.append([index[i], running])
            running = 0
        running += seq[i] 
    cumsum.append([index[-1], running])

    return cumsum

评论是:

我不建议并行化一个不纯的循环。在这种情况下,running变量使其不纯。有4种可能的结果:(1)numba决定它不能并行化它只是处理循环,好像它是cumsum而不是prange(2)它可以将变量提升到循环之外并在余数上使用并行化(3)numba错误地在并行执行之间插入同步,结果可能是伪造的(4)numba在运行时插入必要的同步,这可能会产生比通过并行化首先获得更多的开销

而后来的补充:

当然,runningcumsum变量都会使循环“不纯”,而不仅仅是前面评论中所述的运行变量

然后我被问到:

这可能听起来像一个愚蠢的问题,但我怎么能弄清楚它做了哪4件事并改进了呢?我真的想用numba变得更好!

鉴于它可能对未来的读者有用,我决定在这里创建一个自我回答的Q + A.掠夺者:我无法真正回答4个结果中的哪一个产生的问题(或者如果numba产生完全不同的结果),所以我非常鼓励其他答案。

python parallel-processing numba
1个回答
3
投票

TL; DR:第一:prangerange相同,除非你加入jit平行,例如njit(parallel=True)。如果你试试,你会看到一个关于“不支持的减少”的例外 - 这是因为Numba将prange的范围限制为“纯”循环和“不纯循环”,并且支持numba支持的减少,并确保它的责任进入用户的这两个类别中的任何一个。

这在numbas prange (version 0.42)的文档中有明确说明:

1.10.2。显式并行循环

此代码转换过程的另一个特性是支持显式并行循环。可以使用Numba的prange而不是range来指定循环可以并行化。除了支持的减少之外,用户需要确保循环没有交叉迭代依赖性。

评论中称为“不纯”的内容在该文档中称为“交叉迭代依赖”。这种“交叉迭代依赖性”是在循环之间变化的变量。一个简单的例子是:

def func(n):
    a = 0
    for i in range(n):
        a += 1
    return a

这里变量a取决于它在循环开始之前的值以及循环执行了多少次迭代。这就是“交叉迭代依赖”或“不纯”循环的含义。

显式并行化这种循环时的问题是迭代是并行执行的,但每次迭代都需要知道其他迭代正在做什么。如果不这样做会导致错误的结果。

让我们暂时假设prange会产生4个工人,我们将4作为n传递给函数。一个完全天真的实现会做什么?

Worker 1 starts, gets a i = 1 from `prange`, and reads a = 0
Worker 2 starts, gets a i = 2 from `prange`, and reads a = 0
Worker 3 starts, gets a i = 3 from `prange`, and reads a = 0
Worker 1 executed the loop and sets `a = a + 1` (=> 1)
Worker 3 executed the loop and sets `a = a + 1` (=> 1)
Worker 4 starts, gets a i = 4 from `prange`, and reads a = 2
Worker 2 executed the loop and sets `a = a + 1` (=> 1)
Worker 4 executed the loop and sets `a = a + 1` (=> 3)

=> Loop ended, function return 3

不同的工作人员读取,执行和写入a的顺序可以是任意的,这只是一个例子。它也可能(偶然)产生正确的结果!这通常被称为Race condition

更复杂的prange会认识到存在这样的交叉迭代依赖性会是什么?

有三种选择:

  • 只是不要并行化。
  • 实现工作者共享变量的机制。这里的典型例子是Locks(这会产生很高的开销)。
  • 认识到它是可以并行化的减少。

鉴于我对numba文档的理解(再次重复):

除了支持的减少之外,用户需要确保循环没有交叉迭代依赖性。

Numba:

  • 如果它是已知的减少,那么使用模式来并行化它
  • 如果它不是一个已知的减少抛出异常

不幸的是,目前还不清楚“支持减少”是什么。但是文档暗示它是循环体中前一个值的二元运算符:

如果变量由二元函数/运算符使用其在循环体中的先前值更新,则自动推断减少。对于+=*=运营商,自动推断出减少的初始值。对于其他函数/运算符,reduce变量应在进入prange循环之前保持标识值。对于标量和任意维度的数组,支持以这种方式减少。

OP中的代码使用列表作为交叉迭代依赖项,并在循环体中调用list.append。我个人不会将list.append称为减少而且它不使用二元运算符,所以我的假设是它很可能不受支持。至于其他交叉迭代依赖性running:它在前一次迭代的结果上使用加法(这很好)但如果它超过阈值(可能不是很好)也有条件地将其重置为零。

Numba提供了检查中间代码(LLVM和ASM)代码的方法:

dynamic_cumsum.inspect_types()
dynamic_cumsum.inspect_llvm()
dynamic_cumsum.inspect_asm()

但即使我对结果有必要的理解,也无法对发出的代码的正确性做出任何陈述 - 通常,“证明”多线程/进程代码正常工作是非常不重要的。鉴于我甚至缺乏LLVM和ASM知识,即使它甚至试图并行化它,我实际上也无法回答你的具体问题。

回到代码,正如所提到的,如果我使用parallel=True它会引发异常(不支持的缩减),所以我假设numba不会并行化示例中的任何内容:

from numba import njit, prange

@njit(parallel=True)
def dynamic_cumsum(seq, index, max_value):
    cumsum = []
    running = 0
    for i in prange(len(seq)):
        if running > max_value:
            cumsum.append([index[i], running])
            running = 0
        running += seq[i] 
    cumsum.append([index[-1], running])

    return cumsum

dynamic_cumsum(np.ones(100), np.arange(100), 10)
AssertionError: Invalid reduction format

During handling of the above exception, another exception occurred:

LoweringError: Failed in nopython mode pipeline (step: nopython mode backend)
Invalid reduction format

File "<>", line 7:
def dynamic_cumsum(seq, index, max_value):
    <source elided>
    running = 0
    for i in prange(len(seq)):
    ^

[1] During: lowering "id=2[LoopNest(index_variable = parfor_index.192, range = (0, seq_size0.189, 1))]{56: <ir.Block at <> (10)>, 24: <ir.Block at <> (7)>, 34: <ir.Block at <> (8)>}Var(parfor_index.192, <> (7))" at <> (7)

那么还有什么可说的:在这种情况下,prange在正常的range上没有提供任何速度优势(因为它没有并行执行)。因此,在这种情况下,我不会“冒险”潜在的问题和/或使读者感到困惑 - 因为根据numba文档,它不受支持。

from numba import njit, prange

@njit
def p_dynamic_cumsum(seq, index, max_value):
    cumsum = []
    running = 0
    for i in prange(len(seq)):
        if running > max_value:
            cumsum.append([index[i], running])
            running = 0
        running += seq[i] 
    cumsum.append([index[-1], running])

    return cumsum

@njit
def dynamic_cumsum(seq, index, max_value):
    cumsum = []
    running = 0
    for i in range(len(seq)):  # <-- here is the only change
        if running > max_value:
            cumsum.append([index[i], running])
            running = 0
        running += seq[i] 
    cumsum.append([index[-1], running])

    return cumsum

只是一个快速的时间,支持我之前做的“不快于”的声明:

import numpy as np
seq = np.random.randint(0, 100, 10_000_000)
index = np.arange(10_000_000)
max_ = 500
# Correctness and warm-up
assert p_dynamic_cumsum(seq, index, max_) == dynamic_cumsum(seq, index, max_)
%timeit p_dynamic_cumsum(seq, index, max_)
# 468 ms ± 12.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit dynamic_cumsum(seq, index, max_)
# 470 ms ± 9.49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
© www.soinside.com 2019 - 2024. All rights reserved.