如何优雅地预分配 numpy 数组?

问题描述 投票:0回答:2

这主要是针对 Numpy 数组的,但我觉得这是一个普遍的设计问题。在许多情况下,我在科学计算中遇到以下普遍问题:我必须读取由某个向量场的多个时间实例组成的数据集,例如许多点上的时变速度场。可能是大小为

(Npoints, 3, Ntimesteps)
的 3D 数组(3 因为我有一个向量的三个分量)。我通常每个时间步有一个文件,所以我必须读取
Ntimesteps
文件,但我事先不知道每个字段的大小(即我事先不知道
Npoints
,但对于每个文件)。如果我想预先分配 Numpy 数组来存储这些数据,我习惯于按照以下方式做一些事情:

tsteps = list(glob.glob('time*.csv'))
Nsteps = len(tsteps)
with open(tsteps[0], 'r') as f:
    # do my work to get Npoints and the data of the first file
    dataset = np.zeros(Npoints, 3, Nsteps)
    dataset[:,:,0] = data_first_file
for i,f in enumerate(tsteps[1:]):
    # do my work again
    dataset[:,:,i] = data_from_tstep_i

但是,这看起来不太“优雅”,因为我必须编写两次代码来处理每个时间步文件:一次用于预分配数组,一次在循环中。有更优雅的模式来做到这一点吗?

python arrays numpy
2个回答
1
投票

我认为如果你事先不知道形状,就没有优雅的方法来做到这一点。但为了可读性,您应该在循环内启动它

tsteps = glob.glob('time*.csv') 
for i, filename in enumerate(tsteps):
    with open(filename) as f:
        data_i = np.loadtxt(f)
        if i==0:
            dataset = np.zeros(data_i.shape[0], 3, len(tsteps))
        dataset[:,:,i] = data_i

1
投票

python 3.11.5numpy 1.25.2

优雅的问题取决于品味和背景。不过,我相信至少可以使这项工作更加准确。为了实现这一目标,我将其分为三个步骤:

  1. 从给定文件中提取数据
  2. 从文件生成数据流
  3. 收集数据

每个步骤可以以不同的方式实现,但关键思想在于这种分离。例如:

def load_from_file(file_name):
    data = None
    with open(file_name, 'r') as f:
        # mock data
        npoints, vector_dim = 10, 3
        data = np.arange(npoints*vector_dim).reshape(npoints, vector_dim)
    return data    
    
def fetch_data(source_dir, *, pattern='time*.csv'):
    from pathlib import Path
    for source in Path(source_dir).glob(pattern):
        if source.is_file(): 
            yield load_from_file(source)
        
data = np.r_['2,3,0', *fetch_data('.', pattern='*')]    
            # 2     - stack along the axis 2
            #   3   - force each part into 3 dimensions
            #     0 - place actual dimentions at the beginning

如果某些步骤的逻辑很简单,我们可以将它们组合起来。例如,这是一个很好的场景:

dtype, source, pattern = int, '.', '*.bin'
data = np.stack([np.fromfile(f, dtype).reshape(-1, 3)
                 for f in Path(source).glob(pattern)], axis=2)

如果提取数据比较复杂,但迭代文件仍然很简单,那么:

def get_data(file):
    ...

data = np.stack([get_data(f) for f in Path(source).glob(pattern)
                 if f.is_file()], axis=2)

无论您选择哪种方式,我相信三步模型都有助于您了解整个过程。

注意: 在上面的示例中使用 numpy.stack 以及 numpy.r_ 可确保结果数据为

C_CONTIGUOUS


附注如果我们决定在逐层填充数据之前为数据分配内存,我们将这个逻辑封装在第三步中。例如:

import numpy as np

def load_from_file(file_name, *, columns=None, **np_fromfile_kwargs):
    data = None
    with open(file_name, 'r') as f:
        data = np.fromfile(file_name, **np_fromfile_kwargs)
        match columns:
            case int(n):
                data = data.reshape(-1, n)
            case tuple(shape) | list(shape):
                data = data.reshape(shape)
    return data    
    
def fetch_data(source_dir, *, pattern='*', key=None, reverse=False,
               **loading_settings) -> (int, Iterator):
    from pathlib import Path
    files = [f for f in Path(source_dir).glob(pattern) if f.is_file()]
    files.sort(key=key, reverse=reverse)
    return len(files), (load_from_file(f, **loading_settings) for f in files)

def collect_data(source_dir, **settings):
    nlayers, data_layers = fetch_data(source_dir, **settings)
    if nlayers == 0:
        return None
    first_layer = next(data_layers)
    assert first_layer is not None
    shape = *first_layer.shape, nlayers
    dtype = first_layer.dtype
    data = np.empty(shape, dtype)
    data[..., 0] = first_layer
    for i, layer in enumerate(data_layers, 1):
        data[..., i] = layer if layer is not None else 0
    return data

测试用例:

from pathlib import Path
from tempfile import TemporaryDirectory

with TemporaryDirectory() as source_dir:
    source_dir = Path(source_dir)
    base = np.arange(10*3)
    for n in range(20):
        (base + n).tofile(source_dir/f'time{n:02}.bin')

    fetching_settings = dict(
        pattern='time*.bin', 
        key=lambda x: int(Path(x).stem.removeprefix('time')))
    loading_settings = dict(
        dtype=int, 
        columns=3)

    data = collect_data(source_dir, **(fetching_settings | loading_settings))

print(f'{data.shape = }')
print(f'{data[..., 15] = }')

# data.shape = (10, 3, 20)
# data[..., 15] = array(
#       [[15, 16, 17],
#        [18, 19, 20],
#        [21, 22, 23],
#        [24, 25, 26],
#        [27, 28, 29],
#        [30, 31, 32],
#        [33, 34, 35],
#        [36, 37, 38],
#        [39, 40, 41],
#        [42, 43, 44]])
© www.soinside.com 2019 - 2024. All rights reserved.