如何使用Python的多处理库并行化迭代深度数组的for循环?

问题描述 投票:0回答:1

我使用的测井数据具有与不同深度相关的不同值。我经常需要迭代深度、执行计算并将这些值存储在之前创建的数组中以创建新的测井曲线。它相当于下面的例子。

import numpy as np

depth = np.linspace(5000,6000,2001)
y1 = np.random.random((len(depth),3))
y2 = np.random.random(len(depth))

def fun_1(y1, y2):
    return y1 + y2

def fun_2(y1, y2):
    return sum(y1 * y2)

result_1 = np.zeros_like(y1)
result_2 = np.zeros_like(y2)

for i in range(len(depth)):
    prov_result_1 = fun_1(y1[i], y2[i])
    prov_result_2 = fun_2(y1[i], y2[i])
    
    result_1[i,:] = prov_result_1
    result_2[i]   = prov_result_2

在示例中,我使用 y1 和 y2 值逐个深度计算预备值 prov_result_1 和 prov_result_2,然后将其存储在 result_1 和 result_2 的相应索引中,在循环结束时生成最终的 result_1 和 result_2 曲线。很容易看出,根据深度数组的大小和我在每个深度应用的函数,事情可能会失控,并且此代码可能需要几个小时才能完成。请记住,y1 和 y2 数组可以更大,并且我应用的函数比这些复杂得多。

我想知道是否有一种方法可以使用Python的多处理库来并行化这个for循环。我在 StackOverflow 上找到的其他答案并不能直接转化为这个问题,而且似乎总是比应有的复杂得多。

我想象的一个选择是将深度除以处理器的数量,并并行执行相同的循环,例如:

num_pool = 2
depth_cut = (depth[-1]-depth[0])/num_pool
depth_parallel = [depth[depth <= depth[0] + depth_cut],
                  depth[depth >  depth[0] + depth_cut]]
y1_parallel = [y1[depth <= depth[0] + depth_cut],
               y1[depth >  depth[0] + depth_cut]]
y2_parallel = [y2[depth <= depth[0] + depth_cut],
               y2[depth >  depth[0] + depth_cut]]

只是更通用。然后我会将这些数据放入并行处理中,执行计算,然后再次连接所有内容。

python multithreading numpy performance multiprocessing
1个回答
0
投票

那么你可以:

  • 使用
    multiprocessing
    库,
  • 创建两个并行函数来分别处理块

总体思路是为每个函数创建一个单独的进程,并在每个函数进程内一一处理块。

在我看来,这是一种控制与这两种功能相关的差异化治疗速度的简单方法。

建议的代码

import numpy as np
from multiprocessing import Pool

def fun_1(args):
    y1_i, y2_i = args
    return y1_i + y2_i

def fun_2(args):
    y1_i, y2_i = args
    return sum(y1_i * y2_i)

def separate_process(func, y1, y2, num_pool):
    """Separated process to deal chunks with one function at a time"""
    # Create a pool of workers
    pool = Pool(num_pool)
    # Divide the data into chunks for treatment 
    # (good thing to do with large datasets)
    
    ### Assuming len(y1) = len(y2)
    chunk_size = len(y1) // num_pool
    chunks_y1 = [y1[i:i + chunk_size] for i in range (0, len(y1), chunk_size)]
    chunks_y2 = [y2[i:i + chunk_size] for i in range (0, len(y2), chunk_size)]

    # Apply function for each chunk in parallel
    results = pool.map(func, zip(chunks_y1, chunks_y2))
    # Avoid 'float' exception
    results = [result.tolist() if isinstance(result, np.ndarray) else [result] for result in results]
    # Close the pool
    pool.close()
    # Wait for the process to finish to keep control
    pool.join()

    # Concatenation : chunks treatment strategy is not supposed to be visible at the end
    return results

depth = np.linspace(5000, 6000, 2001)
y1 = np.random.random(len(depth))
y2 = np.random.random(len(depth))

num_pool = 2

fun_1_res = separate_process(fun_1, y1, y2, num_pool)
fun_2_res = separate_process(fun_2, y1, y2, num_pool)

print(len(fun_1_res))
### 3
print(len(fun_2_res))
### 3
© www.soinside.com 2019 - 2024. All rights reserved.