如何加快这个特定示例的插值速度?

问题描述 投票:0回答:1

我制作了一个脚本,使用 pandas 进行数据处理,使用 Numba 提高计算效率,对一组点执行三线性插值。目前,如果考虑 $10^{5}$ 点,则需要 $\mathcal{O}(1) ext{ s}$。

这是代码,假设有一些测试表格数据:

import numpy as np
import pandas as pd
from numba import jit

# Define the symbolic function
def custom_function(x, y, z):
    return np.sin(y) * np.cos(3 * y)**(1 + 5 * x) * np.exp(-np.sqrt(z**2 + x**2) * np.cos(3 * y) / 20) / z

# Define the grid ranges
x_range = np.arange(0.5, 5.5, 0.5)
y_range = np.logspace(np.log10(0.0001), np.log10(0.1), int((np.log10(0.1) - np.log10(0.0001)) / 0.1) + 1)
z_range = np.arange(0.5, 101, 5)

# Generate the DataFrame
data = {'x': [], 'y': [], 'z': [], 'f': []}

for x in x_range:
    for y in y_range:
        for z in z_range:
            data['x'].append(x)
            data['y'].append(y)
            data['z'].append(z)
            data['f'].append(custom_function(x, y, z))

df = pd.DataFrame(data)

# Define the tri-linear interpolation function using Numba
@jit(nopython=True, parallel=True)
def trilinear_interpolation(rand_points, grid_x, grid_y, grid_z, distr):
    results = np.empty(len(rand_points))
    len_y, len_z = grid_y.shape[0], grid_z.shape[0]

    for i in range(len(rand_points)):
        x, y, z = rand_points[i]
        
        idx_x1 = np.searchsorted(grid_x, x) - 1
        idx_x2 = idx_x1 + 1
        idx_y1 = np.searchsorted(grid_y, y) - 1
        idx_y2 = idx_y1 + 1
        idx_z1 = np.searchsorted(grid_z, z) - 1
        idx_z2 = idx_z1 + 1
        
        idx_x1 = max(0, min(idx_x1, len(grid_x) - 2))
        idx_x2 = max(1, min(idx_x2, len(grid_x) - 1))
        idx_y1 = max(0, min(idx_y1, len_y - 2))
        idx_y2 = max(1, min(idx_y2, len_y - 1))
        idx_z1 = max(0, min(idx_z1, len_z - 2))
        idx_z2 = max(1, min(idx_z2, len_z - 1))

        x1, x2 = grid_x[idx_x1], grid_x[idx_x2]
        y1, y2 = grid_y[idx_y1], grid_y[idx_y2]
        z1, z2 = grid_z[idx_z1], grid_z[idx_z2]

        z111 = distr[idx_x1, idx_y1, idx_z1]
        z211 = distr[idx_x2, idx_y1, idx_z1]
        z121 = distr[idx_x1, idx_y2, idx_z1]
        z221 = distr[idx_x2, idx_y2, idx_z1]
        z112 = distr[idx_x1, idx_y1, idx_z2]
        z212 = distr[idx_x2, idx_y1, idx_z2]
        z122 = distr[idx_x1, idx_y2, idx_z2]
        z222 = distr[idx_x2, idx_y2, idx_z2]

        xd = (x - x1) / (x2 - x1)
        yd = (y - y1) / (y2 - y1)
        zd = (z - z1) / (z2 - z1)

        c00 = z111 * (1 - xd) + z211 * xd
        c01 = z112 * (1 - xd) + z212 * xd
        c10 = z121 * (1 - xd) + z221 * xd
        c11 = z122 * (1 - xd) + z222 * xd

        c0 = c00 * (1 - yd) + c10 * yd
        c1 = c01 * (1 - yd) + c11 * yd

        result = c0 * (1 - zd) + c1 * zd

        results[i] = np.exp(result)

    return results

# Provided x value
fixed_x = 2.5  # example provided x value

# Random points for which we need to perform tri-linear interpolation
num_rand_points = 100000  # Large number of random points
rand_points = np.column_stack((
    np.full(num_rand_points, fixed_x),
    np.random.uniform(0.0001, 0.1, num_rand_points),
    np.random.uniform(0.5, 101, num_rand_points)
))

# Prepare the grid and distribution values
grid_x = np.unique(df['x'])
grid_y = np.unique(df['y'])
grid_z = np.unique(df['z'])
distr = np.zeros((len(grid_x), len(grid_y), len(grid_z)))

for i in range(len(df)):
    ix = np.searchsorted(grid_x, df['x'].values[i])
    iy = np.searchsorted(grid_y, df['y'].values[i])
    iz = np.searchsorted(grid_z, df['z'].values[i])
    distr[ix, iy, iz] = df['f'].values[i]

# Perform tri-linear interpolation
interpolated_values = trilinear_interpolation(rand_points, grid_x, grid_y, grid_z, distr)

# Display the results
for point, value in zip(rand_points[:10], interpolated_values[:10]):
    print(f"Point {point}: Interpolated value: {value}")

我想知道是否有任何优化技术或最佳实践可以应用来进一步加速此代码,特别是考虑到所有 x 值都是固定的。任何建议或建议将不胜感激!

python pandas performance interpolation numba
1个回答
0
投票

首先,

grid_x
grid_y
grid_z
很小,因此二分查找并不是查找值的最有效方法。 对于小数组来说,基本线性搜索速度更快。这是一个实现:

@nb.njit('(float64[::1], float64)', inline='always')
def searchsorted_opt(arr, val):
    i = 0
    while i < arr.size and val > arr[i]:
        i += 1
    return i

当数组中的项目明显增多时,您可以从数组的中间开始,并在 N 上跳过 1 个项目(通常使用较小的 N)。

当数组很大时,二分查找成为一种快速的解决方案。人们可以构建索引来避免缓存未命中或使用缓存友好的数据结构,例如B-tree。实际上,我不希望这样的数据结构对您的情况有用,因为您在 3D 网格上操作,因此 3 个数组当然应该总是相当小。

另一种解决方案是根据

grid_*
数组中的值构建查找表 (LUT)。对于均匀分布的项目,您可以执行类似
idx = LUT[int(searchedValue * stride + offset)]
的操作。在其他情况下,您可以在整数转换之前计算多项式校正,以便 LUT 访问保持一致并保持较小。对于平滑函数,您可以直接计算函数或其多项式近似值,然后截断结果 - 不需要 LUT。但同样,只有当
grid_*
数组明显更大时,这才值得。

此外,您的代码目前受益于多线程。您需要明确使用

prange
而不是 max9111 在评论中指出的
range

最后,您可以指定签名,以避免可能的延迟编译时间,如 dankal444 所指出的。

这是生成的代码:

import numba as nb
@nb.njit('(float64[:,::1], float64[::1], float64[::1], float64[::1], float64[:,:,::1])', parallel=True)
def trilinear_interpolation(rand_points, grid_x, grid_y, grid_z, distr):
    results = np.empty(len(rand_points))
    len_y, len_z = grid_y.shape[0], grid_z.shape[0]

    for i in nb.prange(len(rand_points)):
        x, y, z = rand_points[i]

        idx_x1 = searchsorted_opt(grid_x, x) - 1
        idx_x2 = idx_x1 + 1
        idx_y1 = searchsorted_opt(grid_y, y) - 1
        idx_y2 = idx_y1 + 1
        idx_z1 = searchsorted_opt(grid_z, z) - 1
        idx_z2 = idx_z1 + 1

        idx_x1 = max(0, min(idx_x1, len(grid_x) - 2))
        idx_x2 = max(1, min(idx_x2, len(grid_x) - 1))
        idx_y1 = max(0, min(idx_y1, len_y - 2))
        idx_y2 = max(1, min(idx_y2, len_y - 1))
        idx_z1 = max(0, min(idx_z1, len_z - 2))
        idx_z2 = max(1, min(idx_z2, len_z - 1))

        x1, x2 = grid_x[idx_x1], grid_x[idx_x2]
        y1, y2 = grid_y[idx_y1], grid_y[idx_y2]
        z1, z2 = grid_z[idx_z1], grid_z[idx_z2]

        z111 = distr[idx_x1, idx_y1, idx_z1]
        z211 = distr[idx_x2, idx_y1, idx_z1]
        z121 = distr[idx_x1, idx_y2, idx_z1]
        z221 = distr[idx_x2, idx_y2, idx_z1]
        z112 = distr[idx_x1, idx_y1, idx_z2]
        z212 = distr[idx_x2, idx_y1, idx_z2]
        z122 = distr[idx_x1, idx_y2, idx_z2]
        z222 = distr[idx_x2, idx_y2, idx_z2]

        xd = (x - x1) / (x2 - x1)
        yd = (y - y1) / (y2 - y1)
        zd = (z - z1) / (z2 - z1)

        c00 = z111 * (1 - xd) + z211 * xd
        c01 = z112 * (1 - xd) + z212 * xd
        c10 = z121 * (1 - xd) + z221 * xd
        c11 = z122 * (1 - xd) + z222 * xd

        c0 = c00 * (1 - yd) + c10 * yd
        c1 = c01 * (1 - yd) + c11 * yd

        result = c0 * (1 - zd) + c1 * zd

        results[i] = np.exp(result)

    return results

请注意,通过将

np.exp
移出循环并在第二步中进行计算,可以使用 SIMD 友好库 Intel SVML(仅适用于 x86-64 CPU)和多线程更快地计算
np.exp
(同时确保 SVML 可以被 Numba 在目标平台上使用)。话虽如此,加速应该很小,因为
np.exp
只占用执行时间的一小部分。

所提供的代码在我的 6 核 i5-9600KF CPU 上速度快了 6.7 倍。我认为这不能在主流 CPU 上使用 Numba 进一步优化(除了使用上述方法)。至少,对于当前的目标输入来说肯定是不可能的(特别是因为所有内容都适合我机器上的 L3 缓存,distr

甚至适合 L2 缓存)。

© www.soinside.com 2019 - 2024. All rights reserved.