我正在模拟具有相同质量和随机速度的
N
粒子。我想计算系统的总能量和总动量。我考虑了 4 种方法,并想对它们进行基准测试,看看哪一种效果最好:
numba.jit
并自己编写 for 循环。numba.jit
与 parallel=True
结合使用以利用多个核心。这是能量计算的代码:
from functools import wraps
import numpy as np
import numba as nb
import numpy.typing as npt
import perfplot
jit_opts = dict(
nopython=True, nogil=True, cache=False, error_model="numpy", fastmath=True
)
rng = np.random.default_rng()
def hybrid_parallel_jit(get_threshold, *hpj_args, default_threshold=None, **hpj_kwargs):
def _hybrid_parallel_jit(func):
if "parallel" in hpj_kwargs:
raise ValueError("Cannot specify 'parallel' for hybrid jitted function")
hpj_kwargs["parallel"] = True
func_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)
hpj_kwargs["parallel"] = False
func_non_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)
@wraps(func)
def hybrid_func(*args, threshold=None, **kwargs):
if threshold is None:
threshold = default_threshold
if threshold is None:
raise ValueError(
"`threshold` keyword argument was not provided, and a "
"default threshold was not set either."
)
return (
func_non_parallel(*args, **kwargs)
if get_threshold(*args, **kwargs) < threshold
else func_parallel(*args, **kwargs)
)
return hybrid_func
return _hybrid_parallel_jit
def random_unit_vectors(n: int) -> npt.NDArray[float]:
phi = rng.uniform(0, 2 * np.pi, n)
cos_theta = rng.uniform(-1, 1, n)
sin_theta = np.sqrt(1 - cos_theta**2)
return np.column_stack(
(sin_theta * np.cos(phi), sin_theta * np.sin(phi), cos_theta)
)
@nb.jit([nb.float64(nb.float64[:, :])], **jit_opts)
def total_energy(velocities: npt.NDArray[float]) -> float:
energy = 0
for i in range(velocities.shape[0]):
energy += velocities[i, 0] ** 2 + velocities[i, 1] ** 2 + velocities[i, 2] ** 2
return energy / 2
@nb.jit([nb.float64(nb.float64[:, :])], parallel=True, **jit_opts)
def total_energy_parallel(velocities: npt.NDArray[float]) -> float:
energy = 0
for i in nb.prange(velocities.shape[0]):
energy += velocities[i, 0] ** 2 + velocities[i, 1] ** 2 + velocities[i, 2] ** 2
return energy / 2
def total_energy_numpy(velocities: npt.NDArray[float]) -> float:
return np.einsum("ij, ij->", velocities, velocities) / 2
@hybrid_parallel_jit(
lambda v: v.shape[0],
[nb.float64(nb.float64[:, :])],
**jit_opts,
)
def total_energy_improved(velocities: npt.NDArray[float]) -> float:
energy = 0
for i in nb.prange(velocities.shape[0]):
energy += velocities[i, 0] ** 2 + velocities[i, 1] ** 2 + velocities[i, 2] ** 2
return energy / 2
perfplot.show(
setup=lambda n: random_unit_vectors(n),
kernels=[
lambda v: total_energy_numpy(v),
lambda v: total_energy(v),
lambda v: total_energy_parallel(v),
lambda v: total_energy_improved(v, threshold=10_000),
],
labels=["numpy", "numba", "numba parallel", "numba improved"],
n_range=[10**k for k in range(9)],
xlabel="len(a)",
)
如果
@hybrid_parallel_jit
大于指定阈值,则 N
装饰器基本上调用并行版本,否则调用非并行版本。我运行这个得到的结果:
这些结果符合我的预期:并行版本更适合大型
N
(在我的机器上大于 10,000),非并行版本更适合较小的 N
。看起来混合版本(选项 4)是整体最佳选择。
我修改了代码,对动量计算进行了类似的测试:
from functools import wraps
import numpy as np
import numba as nb
import numpy.typing as npt
import perfplot
jit_opts = dict(
nopython=True, nogil=True, cache=False, error_model="numpy", fastmath=True
)
rng = np.random.default_rng()
def hybrid_parallel_jit(get_threshold, *hpj_args, default_threshold=None, **hpj_kwargs):
def _hybrid_parallel_jit(func):
if "parallel" in hpj_kwargs:
raise ValueError("Cannot specify 'parallel' for hybrid jitted function")
hpj_kwargs["parallel"] = True
func_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)
hpj_kwargs["parallel"] = False
func_non_parallel = nb.jit(*hpj_args, **hpj_kwargs)(func)
@wraps(func)
def hybrid_func(*args, threshold=None, **kwargs):
if threshold is None:
threshold = default_threshold
if threshold is None:
raise ValueError(
"`threshold` keyword argument was not provided, and a "
"default threshold was not set either."
)
return (
func_non_parallel(*args, **kwargs)
if get_threshold(*args, **kwargs) < threshold
else func_parallel(*args, **kwargs)
)
return hybrid_func
return _hybrid_parallel_jit
def random_unit_vectors(n: int) -> npt.NDArray[float]:
phi = rng.uniform(0, 2 * np.pi, n)
cos_theta = rng.uniform(-1, 1, n)
sin_theta = np.sqrt(1 - cos_theta**2)
return np.column_stack(
(sin_theta * np.cos(phi), sin_theta * np.sin(phi), cos_theta)
)
@nb.jit([nb.float64[:](nb.float64[:, :])], **jit_opts)
def total_momentum(velocities: npt.NDArray[float]) -> float:
momentum = np.zeros(3)
for i in range(velocities.shape[0]):
momentum += velocities[i]
return momentum
@nb.jit([nb.float64[:](nb.float64[:, :])], parallel=True, **jit_opts)
def total_momentum_parallel(velocities: npt.NDArray[float]) -> float:
momentum = np.zeros(3)
for i in nb.prange(velocities.shape[0]):
momentum += velocities[i]
return momentum
def total_momentum_numpy(velocities: npt.NDArray[float]) -> float:
return np.sum(velocities, axis=0)
@hybrid_parallel_jit(
lambda v: v.shape[0],
[nb.float64[:](nb.float64[:, :])],
**jit_opts,
)
def total_momentum_improved(velocities: npt.NDArray[float]) -> float:
momentum = np.zeros(3)
for i in nb.prange(velocities.shape[0]):
momentum += velocities[i]
return momentum
perfplot.show(
setup=lambda n: random_unit_vectors(n),
kernels=[
lambda v: total_momentum_numpy(v),
lambda v: total_momentum(v),
lambda v: total_momentum_parallel(v),
lambda v: total_momentum_improved(v, threshold=10_000),
],
labels=["numpy", "numba", "numba parallel", "numba improved"],
n_range=[10**k for k in range(9)],
xlabel="len(a)",
)
动量计算与能量计算非常相似,因此我期望得到类似的图表和相同的结论,即选项 4 效果最好。然而,当我运行代码时,我得到:
非并行 numba 版本对于小型和大型
N
来说都是最快的。这些结果让我非常惊讶。动量的计算与能量的计算非常相似。在这两种情况下,我都会以某种方式对速度进行求和,这种方式在运行时成本方面似乎是直接的。有人可以解释为什么我对这两个非常相似的计算得到如此不同的结果吗?更具体地说,为什么使用并行化似乎对动量计算没有帮助,即使对于大的 N
也是如此?任何对这里发生的事情的见解将不胜感激。
查看您的功能:
@nb.jit([nb.float64[:](nb.float64[:, :])], parallel=True, **jit_opts)
def total_momentum_parallel(velocities: npt.NDArray[float]) -> float:
momentum = np.zeros(3)
for i in nb.prange(velocities.shape[0]):
momentum += velocities[i]
return momentum
表明每个线程都试图同时访问同一内存位置(在本例中为
momentum
)。
当然,这意味着需要一些开销(锁定)才能不损坏数据。尝试将行
momentum += velocities[i]
更改为:
momentum[0] += velocities[i, 0]
momentum[1] += velocities[i, 1]
momentum[2] += velocities[i, 2]
现在该功能将无法使用。
补救措施是为每个线程分配动量向量,并在最后对向量求和,例如:
@nb.jit([nb.float64[:](nb.float64[:, :])], parallel=True, **jit_opts)
def total_momentum_parallel(velocities: npt.NDArray[float]) -> float:
N = nb.get_num_threads()
rows = velocities.shape[0]
chunks = rows // N + 1
momentum = np.zeros(shape=(N, 3))
for thread_idx in nb.prange(N):
for i in range(chunks * thread_idx, chunks * (thread_idx + 1)):
if i >= rows:
break
momentum[thread_idx, 0] += velocities[i, 0]
momentum[thread_idx, 1] += velocities[i, 1]
momentum[thread_idx, 2] += velocities[i, 2]
return momentum.sum(axis=0)
在我的 AMD 5700x 上对这个函数进行基准测试会得到这个图表:
现在你看到并行版本是最快的了。