我正在使用 scipy 优化库的“SLSQP”方法在 Python 中执行优化。为了提高雅可比行列式的计算速度,对成本函数应用了多重处理。在这个成本函数中,包含了 scikit-learn 的 DBSCAN 聚类算法。
问题是,在执行多处理时,当我尝试运行 DBSCAN 时,进程挂起。这种现象仅在成本函数与多处理并行时出现,而在使用 ThreadPool 时不会出现。
具体来说,该问题仅出现在Ubuntu环境下;在Windows环境下可以正常工作。我想要一些关于这些环境差异如何影响问题以及为什么 DBSCAN 执行在多处理期间挂起的建议。
import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
from multiprocessing import Pool
def cost_function(x):
print(f"Running cost function for x: {x}")
clustering = DBSCAN().fit(np.array([x]))
cost = np.sum(x**2)
print(f"Cost for x: {x} is {cost}")
return cost
def gradient_function(x):
print(f"Calculating gradient for x: {x}")
with Pool() as pool:
gradients = pool.map(cost_function, [x])
print(f"Gradient for x: {x} is {gradients}")
return np.array(gradients)
x0 = np.array([0.5, 0.5])
print("Starting optimization...")
result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)
print("Optimization result:", result)
这是简单的故障排除代码。也卡住了。 和控制台
Starting optimization...
Running cost function for x: [0.5 0.5]
Cost for x: [0.5 0.5] is 0.5
Calculating gradient for x: [0.5 0.5]
Running cost function for x: [0.5 0.5]
大概,optimize(scipy) 使用 blas 执行并行处理以实现快速计算。 dbscan(scikitlearn)表示使用OpenMp进行并行处理。在并行处理中创建进程并在其中使用 OpenMp 执行并行处理是否是问题的原因?
这个程序有三件事不能合并在一个Python程序中。
您需要删除三个中的一个。
为了解释这一点,我们先从fork启动方法开始。在多处理中,它需要一种启动新进程的方法。在 Linux 上执行此操作的默认方法是 fork。 fork() 系统调用在内存中创建进程的精确副本,然后多处理使用其中一个进程来完成工作,另一个进程(父进程)创建更多进程。
其中一个重要细节是 fork() 与多线程交互的方式。 Fork 会复制线程持有的锁,但不会复制线程本身。因此,如果 fork() 碰巧在父进程中的另一个线程持有锁时运行,则子进程的锁最终可能会永久持有。
为什么在 Windows 上不会发生这种情况? Windows 不支持 fork()。因此,多处理使用不同的启动方法,称为spawn。生成不是复制父进程,而是从一个新进程开始,然后加载所有代码。虽然从性能角度来看这并不理想,但这确实意味着父进程持有的锁不会影响子进程。
所以这给了我们可能的修复#1:使用
multiprocessing.set_start_method('spawn')
更改启动方法以生成。
import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
import multiprocessing
from multiprocessing import Pool
def cost_function(x):
print(f"Running cost function for x: {x}")
clustering = DBSCAN().fit(np.array([x]))
cost = np.sum(x**2)
print(f"Cost for x: {x} is {cost}")
return cost
def gradient_function(x):
print(f"Calculating gradient for x: {x}")
with Pool() as pool:
gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
print(f"Gradient for x: {x} is {gradients}")
return np.array(gradients)
if __name__ == "__main__":
multiprocessing.set_start_method('spawn')
x0 = np.array([0.5, 0.5])
print("Starting optimization...")
result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)
print("Optimization result:", result)
注意:如果使用这种方法,则需要避免在函数或
if __name__ == "__main__":
之外使用任何代码,否则它将在 spawn 创建子进程时运行。
您可以采取的另一种方法是删除多线程。这给了我们可能的修复#2:将
OMP_NUM_THREADS
环境变量设置为1以禁用DBSCAN内的多线程。
import os
os.environ['OMP_NUM_THREADS'] = '1'
import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
from multiprocessing import Pool
def cost_function(x):
print(f"Running cost function for x: {x}")
clustering = DBSCAN().fit(np.array([x]))
cost = np.sum(x**2)
print(f"Cost for x: {x} is {cost}")
return cost
def gradient_function(x):
print(f"Calculating gradient for x: {x}")
with Pool() as pool:
gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
print(f"Gradient for x: {x} is {gradients}")
return np.array(gradients)
if __name__ == "__main__":
x0 = np.array([0.5, 0.5])
print("Starting optimization...")
result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)
print("Optimization result:", result)
警告:设置环境变量和导入 DBSCAN 的顺序很重要。如果先导入DBSCAN,它将忽略OMP_NUM_THREADS。另一种更简洁的方法是使用线程池库。我有一篇博客文章解释了如何使用线程池以及它有什么优点。
但是,DBSCAN 可能需要多线程。但问题不在于使用多线程。问题是在父进程内使用多线程。如果父进程从不运行 DBSCAN,那么它永远不会在父进程内部进行多线程处理。
所以这给了我们可能的修复#3:仅在子进程内运行cost_function()。
import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
from multiprocessing import Pool
def cost_function(x):
print(f"Running cost function for x: {x}")
clustering = DBSCAN().fit(np.array([x]))
cost = np.sum(x**2)
print(f"Cost for x: {x} is {cost}")
return cost
def cost_function_subproc(x):
# Only launch 1 process - that's all we need.
with Pool(1) as pool:
return pool.map(cost_function, [x])[0]
def gradient_function(x):
print(f"Calculating gradient for x: {x}")
with Pool() as pool:
gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
print(f"Gradient for x: {x} is {gradients}")
return np.array(gradients)
if __name__ == "__main__":
x0 = np.array([0.5, 0.5])
print("Starting optimization...")
result = minimize(cost_function_subproc, x0, method='SLSQP', jac=gradient_function)
print("Optimization result:", result)