我正在尝试优化我的光谱基线拟合算法(基于这个),因为根据我的理解,NumPy 的
apply_along_axis
性能并不比简单地循环遍历数组的索引高得多。
目前处理约 4000 个波长的光谱需要约 0.15 秒,但至少可以说,如果应用于数百万个光谱,则需要一段时间。我是否缺少一个明显的矢量化解决方案,或者该算法只是那么繁重?此处提供示例光谱文件,以及其各自的基线。
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
diagonal_weights: dia_array = diags_array(current_weights, dtype=np.float64, format="dia")
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
baseline[:] = spsolve(diagonal_weights + smoothness_penalies, current_weights * padded_spectrum, permc_spec="NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
# (M, N) spectra, where M is the number of signals acquired and N, the amount of wavelengths each signal contains
spectra = np.loadtxt("spectra.csv", dtype=np.float32, delimiter=",")
baseline = np.apply_along_axis(arpls, axis=1, arr=spectra)
np.savetxt("baseline.csv", baseline, delimiter=",")
我不认为有什么能让这个变得更快 - 它大部分时间都花在
spsolve()
上,并且已经用 C 编写了。为了让它更快,你需要以某种方式避免求解这个矩阵,我对这里的数学不太了解,无法想到不涉及求解矩阵的替代方案。
但是,我确实找到了两个想法,可以使速度提高约 36%。
我做的第一件事就是分析该程序。我注意到该程序在 spsolve 行上花费了大量时间,因此我将该行分成多行。
A = diagonal_weights + smoothness_penalies
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec="NATURAL")
然后我对此进行了简介。
69 1203 569457223.0 473364.3 22.6 A = diagonal_weights + smoothness_penalies
70 1203 4759976.0 3956.8 0.2 b = current_weights * padded_spectrum
71 1203 1352957577.0 1e+06 53.7 baseline[:] = spsolve(A, b, permc_spec="NATURAL")
出于我刚才提到的原因,花费 53.7% 的时间在 spsolve 上是很难减少的。然而,22.6% 的时间也花在了将两个矩阵相加上,这对我来说似乎很疯狂。我认为这是将 DIA 转换为 CSC 的开销的某种组合,并且 CSC + CSC 添加速度很慢。
作为替代方案,我尝试识别对角元素的索引、这些元素的初始值,然后使用 NumPy 直接操作对角元素。
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(padded_spectrum.shape[0], dtype=np.float64, format="csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
smoothness_penalies = smoothness_penalies.tocoo()
smoothness_penalies.sum_duplicates()
# smoothness_penalies must have canonical format, or changing to csr could change
# data order
assert smoothness_penalies.has_canonical_format
initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
initial_diag = smoothness_penalies.data[initial_diag_index]
smoothness_penalies = smoothness_penalies.tocsr()
assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
A = smoothness_penalies.copy()
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
A.data[initial_diag_index] = initial_diag + current_weights
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec="NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
这大约快了 26%。
接下来,我查看了
smoothness_penalies
矩阵的创建。关于这个矩阵需要注意的关键一点是它取决于三个因素:频谱的大小、平滑因子和微分阶数。每次调用 arpls()
时这些都是相同的。由于 arpls()
被调用多次,因此这是 memoization 的良好候选者。
为此,首先将
smoothness_penalies
的创建移至其自己的函数中。接下来,应用 functools.lru_cache() 装饰器。这将设置smoothness_penalies
所花费的时间从执行时间的 16% 减少到 0.2%。
import numpy as np
from numpy.typing import NDArray
from scipy.linalg import norm
from scipy.sparse import csc_array, csr_array, dia_array, diags_array, eye_array
from scipy.sparse.linalg import spsolve
from functools import lru_cache
@lru_cache
def initialize_smoothness_penalies(size, differentiation_order, smoothing_factor):
# Step 3: Initialize a difference array for enforcing smoothness
differences: csr_array = eye_array(size, dtype=np.float64, format="csr")
for _ in range(differentiation_order):
# Update the difference array to account for higher-order differentials if required
differences = differences[1:] - differences[:-1]
# Step 4: Compute the smoothness penalties using the difference matrix and smoothing factor
smoothness_penalies: csc_array = smoothing_factor * (differences.T @ differences)
smoothness_penalies = smoothness_penalies.tocoo()
smoothness_penalies.sum_duplicates()
# smoothness_penalies must have canonical format, or changing to csr could change
# data order
assert smoothness_penalies.has_canonical_format
initial_diag_index = np.where(smoothness_penalies.row == smoothness_penalies.col)[0]
initial_diag = smoothness_penalies.data[initial_diag_index]
smoothness_penalies = smoothness_penalies.tocsr()
assert (smoothness_penalies.data[initial_diag_index] == smoothness_penalies.diagonal()).all()
return smoothness_penalies, initial_diag, initial_diag_index
# Sparse array implementation
def arpls(
spectrum: NDArray[np.floating],
smoothing_factor: float = 1E6,
convergence_tolerance: float = 0.05,
differentiation_order: int = 2,
max_iterations: int = 100,
edge_padding: int = 100,
) -> NDArray[np.floating]:
"""
Fits a baseline signal to the given spectrum using Asymmetrically Reweighted Penalized Least Squares smoothing.
### Parameters
- spectrum: `NDArray[np.floating]` -- The 1D spectrum input array.
- smoothing_factor: `float`, optional -- Smoothing strength parameter.\
The larger the parameter, the smoother the resulting baseline.\
High values suppress small fluctuations, which may lead to a smoother but less sensitive baseline.
- convergence_tolerance: `float`, optional -- Convergence criterion based on the change in weights between iterations.\
Must be a value between 0 and 1. Smaller values allow less negative deviations, promoting a more accurate baseline fitting.
- differentiation_order: `int`, optional -- The order of the difference operator used in penalization.\
An order of 1 enforces a linear baseline (smoothing the first derivative), while an order of 2 enforces a quadratic baseline (smoothing the second derivative).\
Higher orders allow for more complex baseline shapes.
- max_iterations: `int`, optional -- Maximum number of iterations to perform.\
The algorithm iteratively refines the baseline until the convergence criterion is met or the maximum number of iterations is reached.
- edge_padding: `int`, optional -- Number of points to pad on both ends of the spectrum to improve baseline fitting at the edges.\
Padding is done using edge values to minimize boundary effects.
### Returns
- `NDArray[np.floating]` -- The fitted baseline signal array of the same length as the input spectrum (after removing padding).
### Raises
- `ValueError` -- If the ratio is not between 0 and 1.
### Reference
- https://irfpy.irf.se/projects/ica/api/api_irfpy.ica.baseline.html#irfpy.ica.baseline.arpls
### Notes
The ARPLS algorithm is particularly useful for baseline correction in spectra where the baseline is not well-defined and varies in a complex way.
The algorithm iteratively adjusts the baseline by penalizing large deviations while allowing for asymmetric weighting, making it robust for various types of spectral data.
"""
# Step 1: Pad the spectrum to reduce boundary effects during baseline fitting.
padded_spectrum = np.pad(spectrum, pad_width=edge_padding, mode="edge")
# Step 2: Initialize the weights array.
current_weights = np.ones_like(padded_spectrum)
smoothness_penalies, initial_diag, initial_diag_index = initialize_smoothness_penalies(padded_spectrum.shape[0], differentiation_order, smoothing_factor)
A = smoothness_penalies.copy()
# Step 5: Iteratively refine the estimated baseline
baseline = np.zeros_like(padded_spectrum)
for _ in range(max_iterations):
# Step 5a: Generate a diagonal weights array from the current weights
A.data[initial_diag_index] = initial_diag + current_weights
# Step 5b: Solve for the baseline using the current weights and smoothness penalties
b = current_weights * padded_spectrum
baseline[:] = spsolve(A, b, permc_spec="NATURAL")
# Step 5c: Calculate residuals (difference between the actual spectrum and the baseline)
residuals = padded_spectrum - baseline
# Step 5d: Calculate statistics of the negative residuals. Negative residuals indicate regions where the baseline may be overestimated
negative_residuals = residuals[residuals < 0]
nr_mean: np.floating = negative_residuals.mean(dtype=np.float64)
nr_deviation: np.floating = negative_residuals.std(dtype=np.float64)
# Step 5e: Update the weights to penalize large deviations while allowing for asymmetry. This step adjusts the baseline fitting to be more sensitive to regions below the baseline
# Potential overflow in the original algorithm: "np.exp(2.0 * (residuals - ((2 * nr_deviation) - nr_mean)) / nr_deviation)"
exponents = 2 * (residuals - (2 * nr_deviation - nr_mean)) / nr_deviation
exponents.clip(-500, 500, out=exponents)
updated_weights = 1.0 / (1.0 + np.exp(exponents, dtype=np.float64))
# Step 5f: Check for convergence by measuring how much the weights have changed
if (norm(current_weights - updated_weights) / norm(current_weights)) < convergence_tolerance:
break # Stop iterating if the change in weights is below the specified tolerance
current_weights[:] = updated_weights
# Step 6: Remove the padding and return the baseline corresponding to the original spectrum
return baseline[edge_padding:-edge_padding]
将此与之前的优化相结合,比原始实现快大约 36%。
结果在原始结果的 10-7 范围内,这是通过将 A 的输入更改为 spsolve 从 CSC 到 CSR 来驱动的。
您可以重写initialize_smoothness_penalies以在CSC中提供结果,但这确实会使spsolve变慢。
以下是我研究过但无法实施的一系列想法。你可能会有更多的运气。