在 ubuntu 中使用 scipy.optimize 和 scikitlearn.dbscan 时,多处理卡住了

问题描述 投票:0回答:1

我正在使用 scipy 优化库的“SLSQP”方法在 Python 中执行优化。为了提高雅可比行列式的计算速度,对成本函数应用了多重处理。在这个成本函数中,包含了 scikit-learn 的 DBSCAN 聚类算法。

问题是,在执行多处理时,当我尝试运行 DBSCAN 时,进程挂起。这种现象仅在成本函数与多处理并行时出现,而在使用 ThreadPool 时不会出现。

具体来说,该问题仅出现在Ubuntu环境下;在Windows环境下可以正常工作。我想要一些关于这些环境差异如何影响问题以及为什么 DBSCAN 执行在多处理期间挂起的建议。

import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
from multiprocessing import Pool

def cost_function(x):
    print(f"Running cost function for x: {x}")
    clustering = DBSCAN().fit(np.array([x]))
    cost = np.sum(x**2)
    print(f"Cost for x: {x} is {cost}")
    return cost

def gradient_function(x):
    print(f"Calculating gradient for x: {x}")
    with Pool() as pool:
        gradients = pool.map(cost_function, [x])
    print(f"Gradient for x: {x} is {gradients}")
    return np.array(gradients)

x0 = np.array([0.5, 0.5])
print("Starting optimization...")
result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)

print("Optimization result:", result)

这是简单的故障排除代码。也卡住了。 和控制台

Starting optimization...
Running cost function for x: [0.5 0.5]
Cost for x: [0.5 0.5] is 0.5
Calculating gradient for x: [0.5 0.5]
Running cost function for x: [0.5 0.5]

大概,optimize(scipy) 使用 blas 执行并行处理以实现快速计算。 dbscan(scikitlearn)表示使用OpenMp进行并行处理。在并行处理中创建进程并在其中使用 OpenMp 执行并行处理是否是问题的原因?

python scikit-learn scipy multiprocessing scipy-optimize-minimize
1个回答
0
投票

这个程序有三件事不能合并在一个Python程序中。

  1. 多重处理。
  2. 父进程内的多线程。
  3. fork启动方法。

您需要删除三个中的一个。

为了解释这一点,我们先从fork启动方法开始。在多处理中,它需要一种启动新进程的方法。在 Linux 上执行此操作的默认方法是 fork。 fork() 系统调用在内存中创建进程的精确副本,然后多处理使用其中一个进程来完成工作,另一个进程(父进程)创建更多进程。

其中一个重要细节是 fork() 与多线程交互的方式。 Fork 会复制线程持有的锁,但不会复制线程本身。因此,如果 fork() 碰巧在父进程中的另一个线程持有锁时运行,则子进程的锁最终可能会永久持有。

为什么在 Windows 上不会发生这种情况? Windows 不支持 fork()。因此,多处理使用不同的启动方法,称为spawn。生成不是复制父进程,而是从一个新进程开始,然后加载所有代码。虽然从性能角度来看这并不理想,但这确实意味着父进程持有的锁不会影响子进程。

所以这给了我们可能的修复#1:使用

multiprocessing.set_start_method('spawn')
更改启动方法以生成。

import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
import multiprocessing
from multiprocessing import Pool

def cost_function(x):
    print(f"Running cost function for x: {x}")
    clustering = DBSCAN().fit(np.array([x]))
    cost = np.sum(x**2)
    print(f"Cost for x: {x} is {cost}")
    return cost


def gradient_function(x):
    print(f"Calculating gradient for x: {x}")
    with Pool() as pool:
        gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
    print(f"Gradient for x: {x} is {gradients}")
    return np.array(gradients)

if __name__ == "__main__":
    multiprocessing.set_start_method('spawn')
    x0 = np.array([0.5, 0.5])
    print("Starting optimization...")
    result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)

    print("Optimization result:", result)

注意:如果使用这种方法,则需要避免在函数或

if __name__ == "__main__":
之外使用任何代码,否则它将在 spawn 创建子进程时运行。

您可以采取的另一种方法是删除多线程。这给了我们可能的修复#2:将

OMP_NUM_THREADS
环境变量设置为1以禁用DBSCAN内的多线程。

import os
os.environ['OMP_NUM_THREADS'] = '1'
import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
from multiprocessing import Pool


def cost_function(x):
    print(f"Running cost function for x: {x}")
    clustering = DBSCAN().fit(np.array([x]))
    cost = np.sum(x**2)
    print(f"Cost for x: {x} is {cost}")
    return cost


def gradient_function(x):
    print(f"Calculating gradient for x: {x}")
    with Pool() as pool:
        gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
    print(f"Gradient for x: {x} is {gradients}")
    return np.array(gradients)

if __name__ == "__main__":
    x0 = np.array([0.5, 0.5])
    print("Starting optimization...")
    result = minimize(cost_function, x0, method='SLSQP', jac=gradient_function)

    print("Optimization result:", result)

警告:设置环境变量和导入 DBSCAN 的顺序很重要。如果先导入DBSCAN,它将忽略OMP_NUM_THREADS。另一种更简洁的方法是使用线程池库。我有一篇博客文章解释了如何使用线程池以及它有什么优点

但是,DBSCAN 可能需要多线程。但问题不在于使用多线程。问题是在父进程内使用多线程。如果父进程从不运行 DBSCAN,那么它永远不会在父进程内部进行多线程处理。

所以这给了我们可能的修复#3:仅在子进程内运行cost_function()。

import numpy as np
from scipy.optimize import minimize
from sklearn.cluster import DBSCAN
from multiprocessing import Pool


def cost_function(x):
    print(f"Running cost function for x: {x}")
    clustering = DBSCAN().fit(np.array([x]))
    cost = np.sum(x**2)
    print(f"Cost for x: {x} is {cost}")
    return cost


def cost_function_subproc(x):
    # Only launch 1 process - that's all we need.
    with Pool(1) as pool:
        return pool.map(cost_function, [x])[0]


def gradient_function(x):
    print(f"Calculating gradient for x: {x}")
    with Pool() as pool:
        gradients = pool.map(cost_function, [x + [0, 0.01], x + [0.01, 0]])
    print(f"Gradient for x: {x} is {gradients}")
    return np.array(gradients)

if __name__ == "__main__":
    x0 = np.array([0.5, 0.5])
    print("Starting optimization...")
    result = minimize(cost_function_subproc, x0, method='SLSQP', jac=gradient_function)

    print("Optimization result:", result)
© www.soinside.com 2019 - 2024. All rights reserved.