我必须在大量数据(5 000 000)上使用 curve_fit numpy 函数。 所以基本上我创建了一个二维数组。第一个维度是要执行的拟合数量,第二个维度是用于拟合的点数。
t = np.array([0 1 2 3 4])
for d in np.ndindex(data.shape[0]):
try:
popt, pcov = curve_fit(func, t, np.squeeze(data[d,:]), p0=[1000,100])
except RuntimeError:
print("Error - curve_fit failed")
多处理可以用来加速整个过程,但它仍然很慢。 有没有办法以“矢量化”方式使用 curve_fit ?
加快速度的一种方法是在 curve_fit 中添加一些先验知识。
如果您知道期望参数的范围,并且不需要达到第 100 位有效数字的精度,则可以大大加快计算速度。
这是一个示例,您将适合
param1
和 param2
:
t = np.array([0 1 2 3 4])
def func(t, param1, param2):
return param1*t + param2*np.exp(t)
for d in np.ndindex(data.shape[0]):
try:
popt, pcov = curve_fit(func, t, np.squeeze(data[d,:]), p0=[1000,100],
bounds=([min_param1, min_param2],[max_param1, max_param2]),
ftol=0.5, xtol=0.5)
except RuntimeError:
print("Error - curve_fit failed")
注意额外的关键参数
bounds
、ftol
和 xtol
。您可以在这里阅读有关它们的信息。
scipy.optimize.leastsq
的功能,
lmdif
本身是底层 MINPACK
lmder
和
func
fortran 例程的包装器。看起来多线程是不可能的,看看这个link,上面写着,底层 Fortran 77 例程(MINPACK lmder.f 和 lmdif.f)不是 可重入,因此 GIL 无法释放。 (因此没有机会并行 使用线程处理。)
仍然有一个开放的ticket来开发这个,但看起来它无法完成......您要么需要使用不同的库,要么在较低级别的代码中编写包装器/函数。有关于并行 Levenberg-Marquardt 算法实现的论文
。也许还有另一种解决方案,使用更少的数据或作为粗略估计,您可以将数据随机分成几个部分,在单独的线程(使用多处理器)上对每个部分进行曲线拟合,并取系数的平均值结束。
根据我的经验,如果可能的话,您应该为 curve_fit 提供雅可比行列式。通过避免一次又一次调用
import numpy as np
import multiprocess as mp
from scipy.optimize import curve_fit
from timeit import default_timer as timer
oneArr = np.load('test.npy')
rowCnt = oneArr.shape[0]
xres, yres = 2, 2
xy = np.array([[-xres, yres], [0, yres], [xres, yres], [-xres, 0], [0, 0], [xres, 0], [-xres, -yres], [0, -yres], [xres, -yres]]).T
funcQuadratic = lambda xy, a, b, c, d, e, f: a * xy[0, :] ** 2 + b * xy[1, :] ** 2 + c * xy[0, :] * xy[1, :] + d * xy[0, :] + e * xy[1, :] + f
## Sequential for loop method
tic = timer()
result_scipy_for_loop = np.zeros((rowCnt, 6))
for row in range(rowCnt):
result_scipy_for_loop[row, :] = curve_fit(f = funcQuadratic, xdata = xy, ydata = oneArr[row, :], p0 = (1, 1, 1, 1, 1, 1), method = 'lm')[0]
tac = timer()
print("result_scipy_for_loop is:", result_scipy_for_loop, "and its time usage is:", tac - tic, "seconds")
## Multiprocessing Process() function
def curve_fit_process(queue, rowRng):
result = [curve_fit(f = funcQuadratic, xdata = xy, ydata = oneArr[idx, :], p0 = (1, 1, 1, 1, 1, 1), method = 'lm')[0] for idx in rowRng]
queue.put(result)
q = mp.Queue()
tic = timer()
num_processes = 30
processes = []
if (rowCnt % num_processes) != 0:
chunks = [np.arange(idx * (rowCnt // num_processes + 1), idx * (rowCnt // num_processes + 1) + rowCnt // num_processes + 1) for idx in range(num_processes)]
for chunk in chunks:
process = mp.Process(target = curve_fit_process, args = (q, chunk, ))
processes.append(process)
process.start()
print("process: ", process.name, '->', process.pid, "starts...")
ret = [q.get() for process in processes]
print("ret is:", ret)
for process in processes:
process.join()
else:
chunks = [np.arange(idx * (rowCnt // num_processes), idx * (rowCnt // num_processes) + rowCnt // num_processes) for idx in range(num_processes)]
for chunk in chunks:
process = mp.Process(target = curve_fit_process, args = (q, chunk, ))
processes.append(process)
process.start()
print("process: ", process.name, '->', process.pid, "starts...")
ret = [q.get() for process in processes]
print("ret is:", ret)
for process in processes:
process.join()
tac = timer()
print("mp.Process() function time usage is:", tac - tic, "seconds.")
来计算雅可比,可以节省时间。它会给您带来显着的速度提升,特别是当您处理大量可优化参数时。看来 @Bhanuday Sharma 是对的,如果不编写低级代码,它不能极大地提高 curve_fit 函数性能并大大减少计算时间。
但是,通过利用 multiprocessing 模块,特别是 multiprocessing.Process() 函数,可以大大提升其性能,减少约 100% 的计算时间。在超过一百万个数据集进行拟合测试时,为 76%。
由于您没有提供原始数据(5 000 000),因此我上传了一个包含1314300行的测试文件(“test.npy”,可以从以下网站访问:https://www.mediafire.com/ file/fa5nq6nycvawa4w/test.npy/file
)进行性能比较。另外,你的预定义函数并不复杂,在现实中并不适用。假设我们有一个二次函数要拟合,其形式为 f(x, y) = ax^2 + by^2 + cxy + dx + ey + f,我们准备使用一百万+数据进行拟合。代码部分:
multiprocessing.Process()
在我的例子中,与传统的
for
循环样式(491.4169644650028秒)相比,它只使用了119.21885330599616秒。希望它对将来的人有用。