更有效地执行np.hstack

问题描述 投票:0回答:1

我发现对数组列表的 np.hstack 操作是我的管道中的瓶颈,我希望有一种更有效的方法来执行串联。

这里是获取 hstack 步骤输入的示例代码:

matrix = np.zeros([5,4])
list_arrays=np.hsplit(matrix, matrix.shape[1])
#----- perform some (parallel) operation on columns
matrix = np.hstack(list_arrays) #--> I would like to optimize this

有什么想法吗?

python numpy concatenation
1个回答
0
投票

正如已经评论过的,如果不知道代码的并行部分到底做了什么,就很难提供量身定制的解决方案。也就是说,在下面的代码中,我假设您想要计算每列的 softmax,我提供了 5 种不同的实现:

  1. calculate_with_split()
    :在每列上使用
    ThreadPoolExecutor
    ,就像您在对该问题的评论中提到的那样。我认为这最接近您实际所做的事情。
  2. calculate_with_pool()
    :在每列上使用
    ThreadPoolExecutor
    ,但不拆分输入。
  3. calculate_with_loop()
    :对列使用循环,逐个处理它们。
  4. calculate_vectorized()
    :使用 Numpy 的矢量化功能。
  5. calculate_in_place()
    :使用 Numpy 的向量化功能并就地计算结果,即更改给定的数组。
from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait
from timeit import Timer
import numpy as np

num_workers = 16
num_timings = 1000
shape_matrix = 500, 40

def normalize(data: np.ndarray, axis: int | None = None):
    # Compute a numerically stable softmax for normalization
    numerator = np.exp(data - data.max(axis=axis, keepdims=True))
    return numerator / numerator.sum(axis=axis, keepdims=True)

def calculate_with_split(matrix):
    list_arrays = np.hsplit(matrix, matrix.shape[1])
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        fs = [executor.submit(normalize, col) for col in list_arrays]
        wait(fs, timeout=None, return_when=ALL_COMPLETED)
        list_arrays = [f.result() for f in fs]
    return np.hstack(list_arrays)

def calculate_with_pool(matrix):
    result = np.empty_like(matrix)
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        fs = [executor.submit(normalize, col) for col in matrix.T]
        wait(fs, timeout=None, return_when=ALL_COMPLETED)
        for i, f in enumerate(fs):
            result[:, i] = f.result()
    return result

def calculate_with_loop(matrix):
    result = np.empty_like(matrix)
    for i in range(matrix.shape[-1]):
        result[:, i] = normalize(matrix[:, i])
    return result

def calculate_vectorized(matrix):
    return normalize(matrix, axis=0)

def calculate_in_place(matrix):
    np.subtract(matrix, matrix.max(axis=0, keepdims=True), out=matrix)
    np.exp(matrix, out=matrix)
    np.divide(matrix, matrix.sum(axis=0, keepdims=True), out=matrix)

rand = np.random.default_rng(seed=42)
matrix = rand.normal(size=shape_matrix)

result_split = calculate_with_split(matrix)
result_pool = calculate_with_pool(matrix)
result_loop = calculate_with_loop(matrix)
result_vect = calculate_vectorized(matrix)

matrix_copy = matrix.copy()
calculate_in_place(matrix_copy)
result_in_place = matrix_copy

assert np.allclose(result_split, result_pool)
assert np.allclose(result_split, result_loop)
assert np.allclose(result_split, result_vect)
assert np.allclose(result_split, result_in_place)

for fct in calculate_with_split, calculate_with_pool, calculate_with_loop, calculate_vectorized, calculate_in_place:
    print(f"{fct.__name__}(): {Timer(lambda: fct(matrix)).timeit(num_timings):.3f} seconds")

结果,我得到:

calculate_with_split(): 3.698 seconds
calculate_with_pool(): 3.500 seconds
calculate_with_loop(): 0.619 seconds
calculate_vectorized(): 0.210 seconds
calculate_in_place(): 0.186 seconds

我从中得出的结论:如果您的问题在维度和复杂性上与上面共享的代码非常相似,并且如果您设法重写您的问题,那么几乎任何解决方案都会比使用拆分和合并方法更快.

不过,我可能会误判你的问题,因为我仍然不清楚你实际上在做什么。无论如何,也许上面的代码可以帮助您尝试和优化实际问题的解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.