我发现对数组列表的 np.hstack 操作是我的管道中的瓶颈,我希望有一种更有效的方法来执行串联。
这里是获取 hstack 步骤输入的示例代码:
matrix = np.zeros([5,4])
list_arrays=np.hsplit(matrix, matrix.shape[1])
#----- perform some (parallel) operation on columns
matrix = np.hstack(list_arrays) #--> I would like to optimize this
有什么想法吗?
正如已经评论过的,如果不知道代码的并行部分到底做了什么,就很难提供量身定制的解决方案。也就是说,在下面的代码中,我假设您想要计算每列的 softmax,我提供了 5 种不同的实现:
calculate_with_split()
:在每列上使用 ThreadPoolExecutor
,就像您在对该问题的评论中提到的那样。我认为这最接近您实际所做的事情。calculate_with_pool()
:在每列上使用 ThreadPoolExecutor
,但不拆分输入。calculate_with_loop()
:对列使用循环,逐个处理它们。calculate_vectorized()
:使用 Numpy 的矢量化功能。calculate_in_place()
:使用 Numpy 的向量化功能并就地计算结果,即更改给定的数组。from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait
from timeit import Timer
import numpy as np
num_workers = 16
num_timings = 1000
shape_matrix = 500, 40
def normalize(data: np.ndarray, axis: int | None = None):
# Compute a numerically stable softmax for normalization
numerator = np.exp(data - data.max(axis=axis, keepdims=True))
return numerator / numerator.sum(axis=axis, keepdims=True)
def calculate_with_split(matrix):
list_arrays = np.hsplit(matrix, matrix.shape[1])
with ThreadPoolExecutor(max_workers=num_workers) as executor:
fs = [executor.submit(normalize, col) for col in list_arrays]
wait(fs, timeout=None, return_when=ALL_COMPLETED)
list_arrays = [f.result() for f in fs]
return np.hstack(list_arrays)
def calculate_with_pool(matrix):
result = np.empty_like(matrix)
with ThreadPoolExecutor(max_workers=num_workers) as executor:
fs = [executor.submit(normalize, col) for col in matrix.T]
wait(fs, timeout=None, return_when=ALL_COMPLETED)
for i, f in enumerate(fs):
result[:, i] = f.result()
return result
def calculate_with_loop(matrix):
result = np.empty_like(matrix)
for i in range(matrix.shape[-1]):
result[:, i] = normalize(matrix[:, i])
return result
def calculate_vectorized(matrix):
return normalize(matrix, axis=0)
def calculate_in_place(matrix):
np.subtract(matrix, matrix.max(axis=0, keepdims=True), out=matrix)
np.exp(matrix, out=matrix)
np.divide(matrix, matrix.sum(axis=0, keepdims=True), out=matrix)
rand = np.random.default_rng(seed=42)
matrix = rand.normal(size=shape_matrix)
result_split = calculate_with_split(matrix)
result_pool = calculate_with_pool(matrix)
result_loop = calculate_with_loop(matrix)
result_vect = calculate_vectorized(matrix)
matrix_copy = matrix.copy()
calculate_in_place(matrix_copy)
result_in_place = matrix_copy
assert np.allclose(result_split, result_pool)
assert np.allclose(result_split, result_loop)
assert np.allclose(result_split, result_vect)
assert np.allclose(result_split, result_in_place)
for fct in calculate_with_split, calculate_with_pool, calculate_with_loop, calculate_vectorized, calculate_in_place:
print(f"{fct.__name__}(): {Timer(lambda: fct(matrix)).timeit(num_timings):.3f} seconds")
结果,我得到:
calculate_with_split(): 3.698 seconds
calculate_with_pool(): 3.500 seconds
calculate_with_loop(): 0.619 seconds
calculate_vectorized(): 0.210 seconds
calculate_in_place(): 0.186 seconds
我从中得出的结论:如果您的问题在维度和复杂性上与上面共享的代码非常相似,并且如果您设法重写您的问题,那么几乎任何解决方案都会比使用拆分和合并方法更快.
不过,我可能会误判你的问题,因为我仍然不清楚你实际上在做什么。无论如何,也许上面的代码可以帮助您尝试和优化实际问题的解决方案。