这主要是针对 Numpy 数组的,但我觉得这是一个普遍的设计问题。在许多情况下,我在科学计算中遇到以下普遍问题:我必须读取由某个向量场的多个时间实例组成的数据集,例如许多点上的时变速度场。可能是大小为
(Npoints, 3, Ntimesteps)
的 3D 数组(3 因为我有一个向量的三个分量)。我通常每个时间步有一个文件,所以我必须读取 Ntimesteps
文件,但我事先不知道每个字段的大小(即我事先不知道 Npoints
,但对于每个文件)。如果我想预先分配 Numpy 数组来存储这些数据,我习惯于按照以下方式做一些事情:
tsteps = list(glob.glob('time*.csv'))
Nsteps = len(tsteps)
with open(tsteps[0], 'r') as f:
# do my work to get Npoints and the data of the first file
dataset = np.zeros(Npoints, 3, Nsteps)
dataset[:,:,0] = data_first_file
for i,f in enumerate(tsteps[1:]):
# do my work again
dataset[:,:,i] = data_from_tstep_i
但是,这看起来不太“优雅”,因为我必须编写两次代码来处理每个时间步文件:一次用于预分配数组,一次在循环中。有更优雅的模式来做到这一点吗?
我认为如果你事先不知道形状,就没有优雅的方法来做到这一点。但为了可读性,您应该在循环内启动它
tsteps = glob.glob('time*.csv')
for i, filename in enumerate(tsteps):
with open(filename) as f:
data_i = np.loadtxt(f)
if i==0:
dataset = np.zeros(data_i.shape[0], 3, len(tsteps))
dataset[:,:,i] = data_i
python 3.11.5numpy 1.25.2
优雅的问题取决于品味和背景。不过,我相信至少可以使这项工作更加准确。为了实现这一目标,我将其分为三个步骤:
每个步骤可以以不同的方式实现,但关键思想在于这种分离。例如:
def load_from_file(file_name):
data = None
with open(file_name, 'r') as f:
# mock data
npoints, vector_dim = 10, 3
data = np.arange(npoints*vector_dim).reshape(npoints, vector_dim)
return data
def fetch_data(source_dir, *, pattern='time*.csv'):
from pathlib import Path
for source in Path(source_dir).glob(pattern):
if source.is_file():
yield load_from_file(source)
data = np.r_['2,3,0', *fetch_data('.', pattern='*')]
# 2 - stack along the axis 2
# 3 - force each part into 3 dimensions
# 0 - place actual dimentions at the beginning
如果某些步骤的逻辑很简单,我们可以将它们组合起来。例如,这是一个很好的场景:
dtype, source, pattern = int, '.', '*.bin'
data = np.stack([np.fromfile(f, dtype).reshape(-1, 3)
for f in Path(source).glob(pattern)], axis=2)
如果提取数据比较复杂,但迭代文件仍然很简单,那么:
def get_data(file):
...
data = np.stack([get_data(f) for f in Path(source).glob(pattern)
if f.is_file()], axis=2)
无论您选择哪种方式,我相信三步模型都有助于您了解整个过程。
注意: 在上面的示例中使用 numpy.stack 以及 numpy.r_ 可确保结果数据为
C_CONTIGUOUS
。
附注如果我们决定在逐层填充数据之前为数据分配内存,我们将这个逻辑封装在第三步中。例如:
import numpy as np
def load_from_file(file_name, *, columns=None, **np_fromfile_kwargs):
data = None
with open(file_name, 'r') as f:
data = np.fromfile(file_name, **np_fromfile_kwargs)
match columns:
case int(n):
data = data.reshape(-1, n)
case tuple(shape) | list(shape):
data = data.reshape(shape)
return data
def fetch_data(source_dir, *, pattern='*', key=None, reverse=False,
**loading_settings) -> (int, Iterator):
from pathlib import Path
files = [f for f in Path(source_dir).glob(pattern) if f.is_file()]
files.sort(key=key, reverse=reverse)
return len(files), (load_from_file(f, **loading_settings) for f in files)
def collect_data(source_dir, **settings):
nlayers, data_layers = fetch_data(source_dir, **settings)
if nlayers == 0:
return None
first_layer = next(data_layers)
assert first_layer is not None
shape = *first_layer.shape, nlayers
dtype = first_layer.dtype
data = np.empty(shape, dtype)
data[..., 0] = first_layer
for i, layer in enumerate(data_layers, 1):
data[..., i] = layer if layer is not None else 0
return data
测试用例:
from pathlib import Path
from tempfile import TemporaryDirectory
with TemporaryDirectory() as source_dir:
source_dir = Path(source_dir)
base = np.arange(10*3)
for n in range(20):
(base + n).tofile(source_dir/f'time{n:02}.bin')
fetching_settings = dict(
pattern='time*.bin',
key=lambda x: int(Path(x).stem.removeprefix('time')))
loading_settings = dict(
dtype=int,
columns=3)
data = collect_data(source_dir, **(fetching_settings | loading_settings))
print(f'{data.shape = }')
print(f'{data[..., 15] = }')
# data.shape = (10, 3, 20)
# data[..., 15] = array(
# [[15, 16, 17],
# [18, 19, 20],
# [21, 22, 23],
# [24, 25, 26],
# [27, 28, 29],
# [30, 31, 32],
# [33, 34, 35],
# [36, 37, 38],
# [39, 40, 41],
# [42, 43, 44]])