在 numba 中使用并行化对于一种算法更好,但对于另一种类似算法则不然?

问题描述 投票:0回答:1

我正在模拟具有相同质量和随机速度的

N
粒子。我想计算系统的总能量和总动量。我考虑了 4 种方法,并想对它们进行基准测试,看看哪一种效果最好:

  1. 使用常规 numpy 例程和计算。
  2. 使用
    numba.jit
    并自己编写 for 循环。
  3. numba.jit
    parallel=True
    结合使用以利用多个核心。
  4. 根据粒子数量使用后两种方法的混合版本。如果我们没有足够的粒子,那么并行化的开销就不值得。

这是能量计算的代码:

from functools import wraps

import numpy as np
import numba as nb
import numpy.typing as npt
import perfplot

jit_opts = dict(
    nopython=True, nogil=True, cache=False, error_model="numpy", fastmath=True
)

rng = np.random.default_rng()


def hybrid_parallel_jit(get_threshold, *hpj_args, default_threshold=None, **hpj_kwargs):
    def _hybrid_parallel_jit(func):
        if "parallel" in hpj_kwargs:
            raise ValueError("Cannot specify 'parallel' for hybrid jitted function")
        hpj_kwargs["parallel"] = True
        func_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)
        hpj_kwargs["parallel"] = False
        func_non_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)

        @wraps(func)
        def hybrid_func(*args, threshold=None, **kwargs):
            if threshold is None:
                threshold = default_threshold
                if threshold is None:
                    raise ValueError(
                        "`threshold` keyword argument was not provided, and a "
                        "default threshold was not set either."
                    )
            return (
                func_non_parallel(*args, **kwargs)
                if get_threshold(*args, **kwargs) < threshold
                else func_parallel(*args, **kwargs)
            )

        return hybrid_func

    return _hybrid_parallel_jit


def random_unit_vectors(n: int) -> npt.NDArray[float]:
    phi = rng.uniform(0, 2 * np.pi, n)
    cos_theta = rng.uniform(-1, 1, n)
    sin_theta = np.sqrt(1 - cos_theta**2)
    return np.column_stack(
        (sin_theta * np.cos(phi), sin_theta * np.sin(phi), cos_theta)
    )


@nb.jit([nb.float64(nb.float64[:, :])], **jit_opts)
def total_energy(velocities: npt.NDArray[float]) -> float:
    energy = 0
    for i in range(velocities.shape[0]):
        energy += velocities[i, 0] ** 2 + velocities[i, 1] ** 2 + velocities[i, 2] ** 2
    return energy / 2


@nb.jit([nb.float64(nb.float64[:, :])], parallel=True, **jit_opts)
def total_energy_parallel(velocities: npt.NDArray[float]) -> float:
    energy = 0
    for i in nb.prange(velocities.shape[0]):
        energy += velocities[i, 0] ** 2 + velocities[i, 1] ** 2 + velocities[i, 2] ** 2
    return energy / 2


def total_energy_numpy(velocities: npt.NDArray[float]) -> float:
    return np.einsum("ij, ij->", velocities, velocities) / 2


@hybrid_parallel_jit(
    lambda v: v.shape[0],
    [nb.float64(nb.float64[:, :])],
    **jit_opts,
)
def total_energy_improved(velocities: npt.NDArray[float]) -> float:
    energy = 0
    for i in nb.prange(velocities.shape[0]):
        energy += velocities[i, 0] ** 2 + velocities[i, 1] ** 2 + velocities[i, 2] ** 2
    return energy / 2


perfplot.show(
    setup=lambda n: random_unit_vectors(n),
    kernels=[
        lambda v: total_energy_numpy(v),
        lambda v: total_energy(v),
        lambda v: total_energy_parallel(v),
        lambda v: total_energy_improved(v, threshold=10_000),
    ],
    labels=["numpy", "numba", "numba parallel", "numba improved"],
    n_range=[10**k for k in range(9)],
    xlabel="len(a)",
)

如果

@hybrid_parallel_jit
大于指定阈值,则
N
装饰器基本上调用并行版本,否则调用非并行版本。我运行这个得到的结果:

这些结果符合我的预期:并行版本更适合大型

N
(在我的机器上大于 10,000),非并行版本更适合较小的
N
。看起来混合版本(选项 4)是整体最佳选择。

我修改了代码,对动量计算进行了类似的测试:

from functools import wraps

import numpy as np
import numba as nb
import numpy.typing as npt
import perfplot

jit_opts = dict(
    nopython=True, nogil=True, cache=False, error_model="numpy", fastmath=True
)

rng = np.random.default_rng()


def hybrid_parallel_jit(get_threshold, *hpj_args, default_threshold=None, **hpj_kwargs):
    def _hybrid_parallel_jit(func):
        if "parallel" in hpj_kwargs:
            raise ValueError("Cannot specify 'parallel' for hybrid jitted function")
        hpj_kwargs["parallel"] = True
        func_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)
        hpj_kwargs["parallel"] = False
        func_non_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)

        @wraps(func)
        def hybrid_func(*args, threshold=None, **kwargs):
            if threshold is None:
                threshold = default_threshold
                if threshold is None:
                    raise ValueError(
                        "`threshold` keyword argument was not provided, and a "
                        "default threshold was not set either."
                    )
            return (
                func_non_parallel(*args, **kwargs)
                if get_threshold(*args, **kwargs) < threshold
                else func_parallel(*args, **kwargs)
            )

        return hybrid_func

    return _hybrid_parallel_jit


def random_unit_vectors(n: int) -> npt.NDArray[float]:
    phi = rng.uniform(0, 2 * np.pi, n)
    cos_theta = rng.uniform(-1, 1, n)
    sin_theta = np.sqrt(1 - cos_theta**2)
    return np.column_stack(
        (sin_theta * np.cos(phi), sin_theta * np.sin(phi), cos_theta)
    )


@nb.jit([nb.float64[:](nb.float64[:, :])], **jit_opts)
def total_momentum(velocities: npt.NDArray[float]) -> float:
    momentum = np.zeros(3)
    for i in range(velocities.shape[0]):
        momentum += velocities[i]
    return momentum


@nb.jit([nb.float64[:](nb.float64[:, :])], parallel=True, **jit_opts)
def total_momentum_parallel(velocities: npt.NDArray[float]) -> float:
    momentum = np.zeros(3)
    for i in nb.prange(velocities.shape[0]):
        momentum += velocities[i]
    return momentum


def total_momentum_numpy(velocities: npt.NDArray[float]) -> float:
    return np.sum(velocities, axis=0)


@hybrid_parallel_jit(
    lambda v: v.shape[0],
    [nb.float64[:](nb.float64[:, :])],
    **jit_opts,
)
def total_momentum_improved(velocities: npt.NDArray[float]) -> float:
    momentum = np.zeros(3)
    for i in nb.prange(velocities.shape[0]):
        momentum += velocities[i]
    return momentum


perfplot.show(
    setup=lambda n: random_unit_vectors(n),
    kernels=[
        lambda v: total_momentum_numpy(v),
        lambda v: total_momentum(v),
        lambda v: total_momentum_parallel(v),
        lambda v: total_momentum_improved(v, threshold=10_000),
    ],
    labels=["numpy", "numba", "numba parallel", "numba improved"],
    n_range=[10**k for k in range(9)],
    xlabel="len(a)",
)

动量计算与能量计算非常相似,因此我期望得到类似的图表和相同的结论,即选项 4 效果最好。然而,当我运行代码时,我得到:

非并行 numba 版本对于小型和大型

N
来说都是最快的。这些结果让我非常惊讶。动量的计算与能量的计算非常相似。在这两种情况下,我都会以某种方式对速度进行求和,这种方式在运行时成本方面似乎是直接的。有人可以解释为什么我对这两个非常相似的计算得到如此不同的结果吗?更具体地说,为什么使用并行化似乎对动量计算没有帮助,即使对于大的
N
也是如此?任何对这里发生的事情的见解将不胜感激。

python numpy benchmarking numba
1个回答
0
投票

查看您的功能:

@nb.jit([nb.float64[:](nb.float64[:, :])], parallel=True, **jit_opts)
def total_momentum_parallel(velocities: npt.NDArray[float]) -> float:
    momentum = np.zeros(3)
    for i in nb.prange(velocities.shape[0]):
        momentum += velocities[i]
    return momentum

表明每个线程都试图同时访问同一内存位置(在本例中为

momentum
)。

当然,这意味着需要一些开销(锁定)才能不损坏数据。尝试将行

momentum += velocities[i]
更改为:

momentum[0] += velocities[i, 0]
momentum[1] += velocities[i, 1]
momentum[2] += velocities[i, 2]

现在该功能将无法使用。

补救措施是为每个线程分配动量向量,并在最后对向量求和,例如:

@nb.jit([nb.float64[:](nb.float64[:, :])], parallel=True, **jit_opts)
def total_momentum_parallel(velocities: npt.NDArray[float]) -> float:
    N = nb.get_num_threads()
    rows = velocities.shape[0]

    chunks = rows // N + 1
    momentum = np.zeros(shape=(N, 3))

    for thread_idx in nb.prange(N):
        for i in range(chunks * thread_idx, chunks * (thread_idx + 1)):
            if i >= rows:
                break
            momentum[thread_idx, 0] += velocities[i, 0]
            momentum[thread_idx, 1] += velocities[i, 1]
            momentum[thread_idx, 2] += velocities[i, 2]

    return momentum.sum(axis=0)

在我的 AMD 5700x 上对这个函数进行基准测试会得到这个图表:

enter image description here

现在你看到并行版本是最快的了。

© www.soinside.com 2019 - 2024. All rights reserved.