尝试优化我的 ARPLS 实施

问题描述 投票:0回答:1

我正在尝试优化我的光谱基线拟合算法(基于这个),因为根据我的理解,NumPy 的

apply_along_axis
性能并不比简单地循环遍历数组的索引高得多。

目前处理约 4000 个波长的光谱需要约 0.15 秒,但至少可以说,如果应用于数百万个光谱,则需要一段时间。我是否缺少一个明显的矢量化解决方案,或者该算法只是那么繁重?此处提供示例光谱文件,以及其各自的基线

import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve


# Sparse array implementation
def arpls(
    spectrum: NDArray[np.floating],
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    differentiation_order: int = 2,
    max_iterations: int = 100,
    edge_padding: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.

    ### Parameters
    - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
    - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
        The larger the parameter, the smoother the resulting baseline.\
        High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
    - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
    - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.

    ### Returns
    - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Raises
    - `ValueError` -- If the ratio is not between 0 and 1.

    ### Reference
    - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
    # Step 2: Initialize the weights array.
    current_weights = np.ones_like(padded_spectrum)

    # Step 3: Initialize a difference array for enforcing smoothness
    differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
    for _ in range(differentiation_order):
        # Update the difference array to account for higher-order differentials if required
        differences = differences[1:] - differences[:-1]

    # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
    smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)

    # Step 5: Iteratively refine the estimated baseline
    baseline = np.zeros_like(padded_spectrum)
    for _ in range(max_iterations):
        # Step 5a: Generate a diagonal weights array from the current weights
        diagonal_weights: dia_array = diags_array(current_weights, dtype=np.float64, format="dia")
        # Step 5b: Solve for the baseline using the current weights and smoothness penalties
        baseline[:] = spsolve(diagonal_weights + smoothness_penalies, current_weights * padded_spectrum, permc_spec="NATURAL")

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5f: Check for convergence by measuring how much the weights have changed
        if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance
        current_weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

# (M, N) spectra, where M is the number of signals acquired and N, the amount of wavelengths each signal contains
spectra = np.loadtxt("spectra.csv", dtype=np.float32, delimiter=",")

baseline = np.apply_along_axis(arpls, axis=1, arr=spectra)
np.savetxt("baseline.csv", baseline, delimiter=",")
python numpy optimization scipy statistics
1个回答
0
投票

我不认为有什么能让这个变得更快 - 它大部分时间都花在

spsolve()
上,并且已经用 C 编写了。为了让它更快,你需要以某种方式避免求解这个矩阵,我对这里的数学不太了解,无法想到不涉及求解矩阵的替代方案。

但是,我确实找到了两个想法,可以使速度提高约 36%。

  • 更快的对角线操作
  • 缓存惩罚矩阵的创建

我做的第一件事就是分析该程序。我注意到该程序在 spsolve 行上花费了大量时间,因此我将该行分成多行。

A = diagonal_weights + smoothness_penalies
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec="NATURAL")

然后我对此进行了简介。

    69      1203  569457223.0 473364.3     22.6          A = diagonal_weights + smoothness_penalies
    70      1203    4759976.0   3956.8      0.2          b = current_weights * padded_spectrum
    71      1203 1352957577.0    1e+06     53.7          baseline[:] = spsolve(A, b, permc_spec="NATURAL")

出于我刚才提到的原因,花费 53.7% 的时间在 spsolve 上是很难减少的。然而,22.6% 的时间也花在了将两个矩阵相加上,这对我来说似乎很疯狂。我认为这是将 DIA 转换为 CSC 的开销的某种组合,并且 CSC + CSC 添加速度很慢。

更快的对角线操作

作为替代方案,我尝试识别对角元素的索引、这些元素的初始值,然后使用 NumPy 直接操作对角元素。

# Sparse array implementation
def arpls(
    spectrum: NDArray[np.floating],
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    differentiation_order: int = 2,
    max_iterations: int = 100,
    edge_padding: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.

    ### Parameters
    - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
    - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
        The larger the parameter, the smoother the resulting baseline.\
        High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
    - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
    - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.

    ### Returns
    - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Raises
    - `ValueError` -- If the ratio is not between 0 and 1.

    ### Reference
    - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
    # Step 2: Initialize the weights array.
    current_weights = np.ones_like(padded_spectrum)

    # Step 3: Initialize a difference array for enforcing smoothness
    differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
    for _ in range(differentiation_order):
        # Update the difference array to account for higher-order differentials if required
        differences = differences[1:] - differences[:-1]

    # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
    smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
    smoothness_penalies = smoothness_penalies.tocoo()
    smoothness_penalies.sum_duplicates()
    # smoothness_penalies must have canonical format, or changing to csr could change
    # data order
    assert smoothness_penalies.has_canonical_format
    initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
    
    initial_diag = smoothness_penalies.data[initial_diag_index]
    smoothness_penalies = smoothness_penalies.tocsr()
    assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
    A = smoothness_penalies.copy()

    # Step 5: Iteratively refine the estimated baseline
    baseline = np.zeros_like(padded_spectrum)
    for _ in range(max_iterations):
        # Step 5a: Generate a diagonal weights array from the current weights
        A.data[initial_diag_index] = initial_diag + current_weights
        # Step 5b: Solve for the baseline using the current weights and smoothness penalties
        b = current_weights * padded_spectrum
        baseline[:] = spsolve(A, b, permc_spec="NATURAL")

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5f: Check for convergence by measuring how much the weights have changed
        if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance
        current_weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

这大约快了 26%。

缓存惩罚矩阵的创建

接下来,我查看了

smoothness_penalies
矩阵的创建。关于这个矩阵需要注意的关键一点是它取决于三个因素:频谱的大小、平滑因子和微分阶数。每次调用
arpls()
时这些都是相同的。由于
arpls()
被调用多次,因此这是 memoization 的良好候选者。

为此,首先将

smoothness_penalies
的创建移至其自己的函数中。接下来,应用 functools.lru_cache() 装饰器。这将设置
smoothness_penalies
所花费的时间从执行时间的 16% 减少到 0.2%。

import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
from functools import lru_cache


@lru_cache
def initialize_smoothness_penalies(size, differentiation_order, smoothing_factor):
    # Step 3: Initialize a difference array for enforcing smoothness
    differences: csr_array = eye_array(size, dtype=np.float64, format="csr")
    for _ in range(differentiation_order):
        # Update the difference array to account for higher-order differentials if required
        differences = differences[1:] - differences[:-1]

    # Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
    smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
    smoothness_penalies = smoothness_penalies.tocoo()
    smoothness_penalies.sum_duplicates()
    # smoothness_penalies must have canonical format, or changing to csr could change
    # data order
    assert smoothness_penalies.has_canonical_format
    initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
    
    initial_diag = smoothness_penalies.data[initial_diag_index]
    smoothness_penalies = smoothness_penalies.tocsr()
    assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
    return smoothness_penalies, initial_diag, initial_diag_index


# Sparse array implementation
def arpls(
    spectrum: NDArray[np.floating],
    smoothing_factor: float = 1E6,
    convergence_tolerance: float = 0.05,
    differentiation_order: int = 2,
    max_iterations: int = 100,
    edge_padding: int = 100,
) -> NDArray[np.floating]:
    """
    Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.

    ### Parameters
    - spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
    - smoothing_factor: `float`, optional -- Smoothing strength parameter.\
        The larger the parameter, the smoother the resulting baseline.\
        High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
    - convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
        Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
    - differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
        An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
        Higher orders allow for more complex baseline shapes.
    - max_iterations: `int`, optional -- Maximum number of iterations to perform.\
        The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
    - edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
        Padding is done using edge values to minimize boundary effects.

    ### Returns
    - `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).

    ### Raises
    - `ValueError` -- If the ratio is not between 0 and 1.

    ### Reference
    - https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls

    ### Notes
    The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
    The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
    """

    # Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
    padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
    # Step 2: Initialize the weights array.
    current_weights = np.ones_like(padded_spectrum)
    smoothness_penalies, initial_diag, initial_diag_index = initialize_smoothness_penalies(padded_spectrum.shape[0], differentiation_order, smoothing_factor)

    A = smoothness_penalies.copy()

    # Step 5: Iteratively refine the estimated baseline
    baseline = np.zeros_like(padded_spectrum)
    for _ in range(max_iterations):
        # Step 5a: Generate a diagonal weights array from the current weights
        A.data[initial_diag_index] = initial_diag + current_weights
        # Step 5b: Solve for the baseline using the current weights and smoothness penalties
        b = current_weights * padded_spectrum
        baseline[:] = spsolve(A, b, permc_spec="NATURAL")

        # Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
        residuals = padded_spectrum - baseline
        # Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
        negative_residuals = residuals[residuals < 0]
        nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
        nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)

        # Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
        # Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
        exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
        exponents.clip(-500, 500, out=exponents)
        updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))

        # Step 5f: Check for convergence by measuring how much the weights have changed
        if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
            break # Stop iterating if the change in weights is below the specified tolerance
        current_weights[:] = updated_weights

    # Step 6: Remove the padding and return the baseline corresponding to the original spectrum
    return baseline[edge_padding:-edge_padding]

将此与之前的优化相结合,比原始实现快大约 36%。

与原始结果比较

结果在原始结果的 10-7 范围内,这是通过将 A 的输入更改为 spsolve 从 CSC 到 CSR 来驱动的。

您可以重写initialize_smoothness_penalies以在CSC中提供结果,但这确实会使spsolve变慢。

进一步工作

以下是我研究过但无法实施的一系列想法。你可能会有更多的运气。

  • 迭代求解器:spsolve 是一个直接求解器,但 scipy 有许多迭代求解器。理论上,如果您可以确定 x0 的良好初始猜测(例如上次迭代的基线),那么您可以在很少的迭代中获得答案。在实践中,我无法让它比直接求解器更快。我发现的最好的迭代求解器是 cg,但它仍然较慢。
  • umfpack:SciPy 文档中有一条注释,暗示基于 umfpack 的 spsolve 比 SciPy 的解决方案更快。但是,这取决于 scikit-umfpack 软件包,我无法安装该软件包。
© www.soinside.com 2019 - 2024. All rights reserved.