我使用的测井数据具有与不同深度相关的不同值。我经常需要迭代深度、执行计算并将这些值存储在之前创建的数组中以创建新的测井曲线。它相当于下面的例子。
import numpy as np
depth = np.linspace(5000,6000,2001)
y1 = np.random.random((len(depth),3))
y2 = np.random.random(len(depth))
def fun_1(y1, y2):
return y1 + y2
def fun_2(y1, y2):
return sum(y1 * y2)
result_1 = np.zeros_like(y1)
result_2 = np.zeros_like(y2)
for i in range(len(depth)):
prov_result_1 = fun_1(y1[i], y2[i])
prov_result_2 = fun_2(y1[i], y2[i])
result_1[i,:] = prov_result_1
result_2[i] = prov_result_2
在示例中,我使用 y1 和 y2 值逐个深度计算预备值 prov_result_1 和 prov_result_2,然后将其存储在 result_1 和 result_2 的相应索引中,在循环结束时生成最终的 result_1 和 result_2 曲线。很容易看出,根据深度数组的大小和我在每个深度应用的函数,事情可能会失控,并且此代码可能需要几个小时才能完成。请记住,y1 和 y2 数组可以更大,并且我应用的函数比这些复杂得多。
我想知道是否有一种方法可以使用Python的多处理库来并行化这个for循环。我在 StackOverflow 上找到的其他答案并不能直接转化为这个问题,而且似乎总是比应有的复杂得多。
我想象的一个选择是将深度除以处理器的数量,并并行执行相同的循环,例如:
num_pool = 2
depth_cut = (depth[-1]-depth[0])/num_pool
depth_parallel = [depth[depth <= depth[0] + depth_cut],
depth[depth > depth[0] + depth_cut]]
y1_parallel = [y1[depth <= depth[0] + depth_cut],
y1[depth > depth[0] + depth_cut]]
y2_parallel = [y2[depth <= depth[0] + depth_cut],
y2[depth > depth[0] + depth_cut]]
只是更通用。然后我会将这些数据放入并行处理中,执行计算,然后再次连接所有内容。
那么你可以:
multiprocessing
库,总体思路是为每个函数创建一个单独的进程,并在每个函数进程内一一处理块。
在我看来,这是一种控制与这两种功能相关的差异化治疗速度的简单方法。
建议的代码
import numpy as np
from multiprocessing import Pool
def fun_1(args):
y1_i, y2_i = args
return y1_i + y2_i
def fun_2(args):
y1_i, y2_i = args
return sum(y1_i * y2_i)
def separate_process(func, y1, y2, num_pool):
"""Separated process to deal chunks with one function at a time"""
# Create a pool of workers
pool = Pool(num_pool)
# Divide the data into chunks for treatment
# (good thing to do with large datasets)
### Assuming len(y1) = len(y2)
chunk_size = len(y1) // num_pool
chunks_y1 = [y1[i:i + chunk_size] for i in range (0, len(y1), chunk_size)]
chunks_y2 = [y2[i:i + chunk_size] for i in range (0, len(y2), chunk_size)]
# Apply function for each chunk in parallel
results = pool.map(func, zip(chunks_y1, chunks_y2))
# Avoid 'float' exception
results = [result.tolist() if isinstance(result, np.ndarray) else [result] for result in results]
# Close the pool
pool.close()
# Wait for the process to finish to keep control
pool.join()
# Concatenation : chunks treatment strategy is not supposed to be visible at the end
return results
depth = np.linspace(5000, 6000, 2001)
y1 = np.random.random(len(depth))
y2 = np.random.random(len(depth))
num_pool = 2
fun_1_res = separate_process(fun_1, y1, y2, num_pool)
fun_2_res = separate_process(fun_2, y1, y2, num_pool)
print(len(fun_1_res))
### 3
print(len(fun_2_res))
### 3