优化数据帧列的大量矩阵乘法

问题描述 投票:0回答:1

我正在编写一个代码来根据飞机的方向和全局速度矢量计算飞机的迎角和侧滑角。我的所有数据都保存在数据框中。为了计算角度,我首先需要确定飞机的相对速度,然后用这段代码来完成。从那里获得迎角和侧滑角就很简单了。

这段代码可以工作,但我觉得我缺少一些可以让它运行得更快的东西。有人有什么想法吗?

    # Calculate aircraft component velocities
    df['a11'] = np.cos(df['Pitch']) * np.cos(df['Yaw'])
    df['a12'] = np.cos(df['Pitch']) * np.sin(df['Yaw'])
    df['a13'] = -1 * np.sin(df['Pitch'])
    
    df['a21'] = np.sin(df['Pitch']) * np.sin(df['Roll']) * np.cos(df['Yaw']) - np.cos(df['Roll']) * np.sin(df['Yaw'])
    df['a22'] = np.sin(df['Pitch']) * np.sin(df['Roll']) * np.sin(df['Yaw']) + np.cos(df['Roll']) * np.cos(df['Yaw'])
    df['a23'] = np.sin(df['Roll']) * np.cos(df['Pitch'])
    
    df['a31'] = np.cos(df['Yaw']) * np.cos(df['Roll']) * np.sin(df['Pitch']) + np.sin(df['Roll']) * np.sin(df['Yaw'])
    df['a32'] = np.sin(df['Yaw']) * np.cos(df['Roll']) * np.sin(df['Pitch']) - np.sin(df['Roll']) * np.cos(df['Yaw'])
    df['a33'] = np.cos(df['Roll']) * np.cos(df['Pitch'])
    
    df['Vxx'] = 0
    df['Vyy'] = 0
    df['Vzz'] = 0
    
    for idx, row in df.iterrows():
        A = [[row['a11'], row['a12'], row['a13']],
              [row['a21'], row['a22'], row['a23']],
              [row['a31'], row['a32'], row['a33']]]
        B = [[row['Vx']], [row['Vy']], [row['Vz']]]
        
        result = list()
        for i in range(len(A)):
            result.append([0])
        
        for i in range(3):
            for j in range(1):
                for k in range(len(B)):
                    result[i][j] += A[i][k] * B[k][j]
                    
        df.loc[idx, 'Vxx'] = result[0][0]
        df.loc[idx, 'Vyy'] = result[1][0]
        df.loc[idx, 'Vzz'] = result[2][0]
python pandas dataframe matrix-multiplication euler-angles
1个回答
0
投票

你绝对可以做得更快,在某些方面比其他方面更明显。将
pandas.DataFrame
转换为
numpy.ndarray
可以提供相当显着的加速(在我下面的测试中快 14 倍),然后减少一点循环可以进一步提高,最后预先计算所有昂贵的 sin 和 cos 数学可以帮助还有更多,使其速度快了令人难以置信的 40 倍。

我做的第一件事是将你的逻辑放入一个函数中,以便我可以使用它来生成测试结果来检查其他算法。接下来我创建了一个函数来生成测试数据。最后的设置步骤是创建一个主程序,运行并分析原始版本并保留输出,然后执行改进的版本并比较结果。


pandas.DataFrame
numpy.ndarray

优化算法的第一步是将

pandas.DataFrame
转换为
numpy.ndarray
,这非常简单,因为原始数据中的所有字段似乎都是浮点数。然后为附加字段分配一个带有额外空间的输出数组,并放入起始数据。大部分工作是将
pandas
逻辑转换为
numpy
逻辑。最后,将最终的
numpy.ndarray
转换回
pandas.DataFrame
以从函数返回。仅此一项就产生了约 14 倍的加速。

这是第一遍的结果:

COLUMN_NAMES_RAW = [
    'Pitch',
    'Yaw',
    'Roll',
    'Vx',
    'Vy',
    'Vz',
    'Vxx',
    'Vyy',
    'Vzz',
]

COLUMN_NAMES_ADDN = [
    'a11',
    'a12',
    'a13',
    'a21',
    'a22',
    'a23',
    'a31',
    'a32',
    'a33'
]

def fun_np(df: pd.DataFrame) -> pd.DataFrame:
    # Choose a data type for the operations
    #   Unfortunately, float32 is too imprecise and it fails the test
    dtype = np.float64

    # The output shape to use, which has all the input columns + all the additional columns
    total_shape = (len(df), len(COLUMN_NAMES_RAW) + len(COLUMN_NAMES_ADDN))
    # Allocate an empty array
    #   There is no need to prefill since all the positions will be assigned to
    array = np.empty(total_shape, dtype=dtype)
    # Load the array with all the input data
    array[:, :len(COLUMN_NAMES_RAW)] = df.to_numpy(dtype=dtype)

    # pitch - 0
    # Vz - 3
    # Vxx - 6
    # a11 - 9
    # a21 - 12
    # a31 - 15

    rows = array.shape[0]
    for i in range(rows):
        row = array[i, :]
        row[9] = np.cos(row[0]) * np.cos(row[1])
        row[10] = np.cos(row[0]) * np.sin(row[1])
        row[11] = -1 * np.sin(row[0])

        row[12] = np.sin(row[0]) * np.sin(row[2]) * np.cos(row[1]) - np.cos(row[2]) * np.sin(row[1])
        row[13] = np.sin(row[0]) * np.sin(row[2]) * np.sin(row[1]) + np.cos(row[2]) * np.cos(row[1])
        row[14] = np.sin(row[2]) * np.cos(row[0])

        row[15] = np.cos(row[1]) * np.cos(row[2]) * np.sin(row[0]) + np.sin(row[2]) * np.sin(row[1])
        row[16] = np.sin(row[1]) * np.cos(row[2]) * np.sin(row[0]) - np.sin(row[2]) * np.cos(row[1])
        row[17] = np.cos(row[2]) * np.cos(row[0])

        line_res = np.zeros(shape=(3,))
        a = np.array([
            row[9:12],
            row[12:15],
            row[15:18]
        ])
        b = row[3:6]

        for j in range(3):
            for k in range(3):
                line_res[j] += a[j][k] * b[k]
        row[6: 9] = line_res

    # Convert the array back into a DataFrame
    df2 = pd.DataFrame(array, columns=COLUMN_NAMES_RAW + COLUMN_NAMES_ADDN)

    return df2

你可以停在这里,已经有了速度快 14 倍的算法,但是......


简化循环逻辑

最后的循环逻辑似乎也很容易受到优化,因此我重新排列它以最大限度地减少所需的操作数量,并删除

A
B
result
分配,以用这个看似简单的循环代替它相同的输出:

for j in range(3):
    for k in range(3, 6):
        row[j + 6] += row[j * 3 + k + 6] * row[k]

它确实要求数组的这些字段预先填充 0,这还不错。

array[:, 6:9] = 0.0

这两项更改确实对可读性造成了一些影响,但使我们的速度从大约 14 倍提高到了 15 倍,这是适度的改进。你可以停在这里,但是......


预先计算三角函数

重新计算相同的 sin 和 cos 可能非常昂贵,因为其余大部分都是简单的矩阵分配。分解出这些计算然后重用结果可能会提供更好的性能,如下所示:

for i in range(array.shape[0]):
    row = array[i, :]
    # Precalculate each one to avoid repeated expensive trig function calls
    cos0 = np.cos(row[0])
    cos1 = np.cos(row[1])
    cos2 = np.cos(row[2])
    sin0 = np.sin(row[0])
    sin1 = np.sin(row[1])
    sin2 = np.sin(row[2])

    row[9] = cos0 * cos1
    row[10] = cos0 * sin1
    row[11] = -1 * sin0

    row[12] = sin0 * sin2 * cos1 - cos2 * sin1
    row[13] = sin0 * sin2 * sin1 + cos2 * cos1
    row[14] = sin2 * cos0

    row[15] = cos1 * cos2 * sin0 + sin2 * sin1
    row[16] = sin1 * cos2 * sin0 - sin2 * cos1
    row[17] = cos2 * cos0

令我惊讶的是,速度从原来的 15 倍提高到了 41 倍。

我研究过使用

numba
来编译逻辑位,但无法比现在更快地获得任何东西,似乎
numpy.sin
numpy.cos
已经相当优化,并且其中的其他内容从编译中受益匪浅与
numba


最终结果

这是可运行状态下的最终结果代码,包括计时、测试等所需的所有样板代码:

import time

import numpy as np
import pandas as pd


COLUMN_NAMES_RAW = [
    'Pitch',
    'Yaw',
    'Roll',
    'Vx',
    'Vy',
    'Vz',
    'Vxx',
    'Vyy',
    'Vzz',
]

COLUMN_NAMES_ADDN = [
    'a11',
    'a12',
    'a13',
    'a21',
    'a22',
    'a23',
    'a31',
    'a32',
    'a33'
]


def _gt(s: float = 0.0) -> float:
    return time.perf_counter() - s


def ref_fun(df: pd.DataFrame) -> pd.DataFrame:
    # Calculate aircraft component velocities
    df['a11'] = np.cos(df['Pitch']) * np.cos(df['Yaw'])
    df['a12'] = np.cos(df['Pitch']) * np.sin(df['Yaw'])
    df['a13'] = -1 * np.sin(df['Pitch'])

    df['a21'] = np.sin(df['Pitch']) * np.sin(df['Roll']) * np.cos(df['Yaw']) - np.cos(df['Roll']) * np.sin(df['Yaw'])
    df['a22'] = np.sin(df['Pitch']) * np.sin(df['Roll']) * np.sin(df['Yaw']) + np.cos(df['Roll']) * np.cos(df['Yaw'])
    df['a23'] = np.sin(df['Roll']) * np.cos(df['Pitch'])

    df['a31'] = np.cos(df['Yaw']) * np.cos(df['Roll']) * np.sin(df['Pitch']) + np.sin(df['Roll']) * np.sin(df['Yaw'])
    df['a32'] = np.sin(df['Yaw']) * np.cos(df['Roll']) * np.sin(df['Pitch']) - np.sin(df['Roll']) * np.cos(df['Yaw'])
    df['a33'] = np.cos(df['Roll']) * np.cos(df['Pitch'])

    df['Vxx'] = 0.0  # Changed to 0.0 from 0 to address pandas warning
    df['Vyy'] = 0.0
    df['Vzz'] = 0.0

    for idx, row in df.iterrows():
        A = [[row['a11'], row['a12'], row['a13']],
             [row['a21'], row['a22'], row['a23']],
             [row['a31'], row['a32'], row['a33']]]
        B = [[row['Vx']], [row['Vy']], [row['Vz']]]

        result = list()
        for i in range(len(A)):
            result.append([0])

        for i in range(3):
            for j in range(1):
                for k in range(len(B)):
                    result[i][j] += A[i][k] * B[k][j]

        df.loc[idx, 'Vxx'] = result[0][0]
        df.loc[idx, 'Vyy'] = result[1][0]
        df.loc[idx, 'Vzz'] = result[2][0]

    return df


def fun_np(df: pd.DataFrame) -> pd.DataFrame:
    # Choose a data type for the operations
    #   Unfortunately, float32 is too imprecise and it fails the test
    dtype = np.float64

    # The output shape to use, which has all the input columns + all the additional columns
    total_shape = (len(df), len(COLUMN_NAMES_RAW) + len(COLUMN_NAMES_ADDN))
    # Allocate an empty array
    #   There is no need to prefil since all the positions will be assigned to
    array = np.empty(total_shape, dtype=dtype)
    # Load the array with all the input data
    array[:, :len(COLUMN_NAMES_RAW)] = df.to_numpy(dtype=dtype)

    # pitch - 0
    # Vz - 3
    # Vxx - 6
    # a11 - 9
    # a21 - 12
    # a31 - 15

    rows = array.shape[0]
    for i in range(rows):
        row = array[i, :]
        row[9] = np.cos(row[0]) * np.cos(row[1])
        row[10] = np.cos(row[0]) * np.sin(row[1])
        row[11] = -1 * np.sin(row[0])

        row[12] = np.sin(row[0]) * np.sin(row[2]) * np.cos(row[1]) - np.cos(row[2]) * np.sin(row[1])
        row[13] = np.sin(row[0]) * np.sin(row[2]) * np.sin(row[1]) + np.cos(row[2]) * np.cos(row[1])
        row[14] = np.sin(row[2]) * np.cos(row[0])

        row[15] = np.cos(row[1]) * np.cos(row[2]) * np.sin(row[0]) + np.sin(row[2]) * np.sin(row[1])
        row[16] = np.sin(row[1]) * np.cos(row[2]) * np.sin(row[0]) - np.sin(row[2]) * np.cos(row[1])
        row[17] = np.cos(row[2]) * np.cos(row[0])

        line_res = np.zeros(shape=(3,))
        a = np.array([
            row[9:12],
            row[12:15],
            row[15:18]
        ])
        b = row[3:6]

        for j in range(3):
            for k in range(3):
                line_res[j] += a[j][k] * b[k]
        row[6: 9] = line_res

    # Convert the array back into a DataFrame
    df2 = pd.DataFrame(array, columns=COLUMN_NAMES_RAW + COLUMN_NAMES_ADDN)

    return df2


def fun_np2(df: pd.DataFrame) -> pd.DataFrame:
    dtype = np.float64

    total_shape = (len(df), len(COLUMN_NAMES_RAW) + len(COLUMN_NAMES_ADDN))
    array = np.empty(total_shape, dtype=dtype)
    array[:, :len(COLUMN_NAMES_RAW)] = df.to_numpy(dtype=dtype)

    # This is replacing the result list variable in the original code
    #   Just perform the operations in place to avoid the extra allocations etc
    array[:, 6:9] = 0.0

    for i in range(array.shape[0]):
        row = array[i, :]
        row[9] = np.cos(row[0]) * np.cos(row[1])
        row[10] = np.cos(row[0]) * np.sin(row[1])
        row[11] = -1 * np.sin(row[0])

        row[12] = np.sin(row[0]) * np.sin(row[2]) * np.cos(row[1]) - np.cos(row[2]) * np.sin(row[1])
        row[13] = np.sin(row[0]) * np.sin(row[2]) * np.sin(row[1]) + np.cos(row[2]) * np.cos(row[1])
        row[14] = np.sin(row[2]) * np.cos(row[0])

        row[15] = np.cos(row[1]) * np.cos(row[2]) * np.sin(row[0]) + np.sin(row[2]) * np.sin(row[1])
        row[16] = np.sin(row[1]) * np.cos(row[2]) * np.sin(row[0]) - np.sin(row[2]) * np.cos(row[1])
        row[17] = np.cos(row[2]) * np.cos(row[0])

        # Those loops can be greatly simplified using pointer math
        for j in range(3):
            for k in range(3, 6):
                row[j + 6] += row[j * 3 + k + 6] * row[k]

    df2 = pd.DataFrame(array, columns=COLUMN_NAMES_RAW + COLUMN_NAMES_ADDN)

    return df2


def fun_np3(df: pd.DataFrame) -> pd.DataFrame:
    dtype = np.float64

    total_shape = (len(df), len(COLUMN_NAMES_RAW) + len(COLUMN_NAMES_ADDN))
    array = np.empty(total_shape, dtype=dtype)
    array[:, :len(COLUMN_NAMES_RAW)] = df.to_numpy(dtype=dtype)

    array[:, 6:9] = 0.0

    for i in range(array.shape[0]):
        row = array[i, :]
        # Precalculate each one to avoid repeated expensive trig function calls
        cos0 = np.cos(row[0])
        cos1 = np.cos(row[1])
        cos2 = np.cos(row[2])
        sin0 = np.sin(row[0])
        sin1 = np.sin(row[1])
        sin2 = np.sin(row[2])

        row[9] = cos0 * cos1
        row[10] = cos0 * sin1
        row[11] = -1 * sin0

        row[12] = sin0 * sin2 * cos1 - cos2 * sin1
        row[13] = sin0 * sin2 * sin1 + cos2 * cos1
        row[14] = sin2 * cos0

        row[15] = cos1 * cos2 * sin0 + sin2 * sin1
        row[16] = sin1 * cos2 * sin0 - sin2 * cos1
        row[17] = cos2 * cos0

        for j in range(3):
            for k in range(3, 6):
                row[j + 6] += row[j * 3 + k + 6] * row[k]

    df2 = pd.DataFrame(array, columns=COLUMN_NAMES_RAW + COLUMN_NAMES_ADDN)

    return df2


def _make_data(n: int):

    rng = np.random.default_rng(seed=42)

    out_data = []
    for i in range(n):
        new_row = {}
        out_data.append(new_row)

        new_row['Pitch'] = rng.random() * 180.0 - 90.0
        new_row['Yaw'] = rng.random() * 360.0 - 180.0
        new_row['Roll'] = rng.random() * 360.0 - 180.0
        new_row['Vx'] = rng.random()
        new_row['Vy'] = rng.random()
        new_row['Vz'] = rng.random()
        new_row['Vxx'] = 0.0
        new_row['Vyy'] = 0.0
        new_row['Vzz'] = 0.0
    df = pd.DataFrame(out_data)
    return df

def _main():
    field_w1 = 24

    N = 10000

    data = _make_data(N)

    ref_data = data.copy()
    s = _gt()
    ref_data = ref_fun(ref_data)
    ref_time = _gt(s)
    print(ref_data.shape)
    print(f'{"Run time ref":<{field_w1}}{ref_time * 1000:12.1f} ms')

    for funname, fun in (
            ('numpy', fun_np),
            ('numpy2', fun_np2),
            ('numpy3', fun_np3),
            # ('numpy4', fun_np4),
    ):
        input_data = data.copy()
        s = _gt()
        com_data = fun(input_data)
        com_time = _gt(s)
        passed = com_data.equals(ref_data)
        print(f'{"Run time " + funname:<{field_w1}}{com_time * 1000:12.1f} ms'
              f'  -  {"Passed" if passed else "Failed"}'
              f'{"  {: 6.1f}x faster".format(ref_time / com_time) if passed else ""}')


if __name__ == '__main__':
    _main()

当我运行时,我得到以下高度总结的输出:

(10000, 18)
Run time ref                  3555.5 ms
Run time numpy                 256.1 ms  -  Passed    13.9x faster
Run time numpy2                231.1 ms  -  Passed    15.4x faster
Run time numpy3                 87.6 ms  -  Passed    40.6x faster

使用 Windows 10、Python 3.11.8、i9-10900K 进行测试

如果您有任何疑问或注意到任何其他改进,请告诉我

© www.soinside.com 2019 - 2024. All rights reserved.