在 python xarray 中,如何在不加载整个数据数组的情况下创建惰性变量并对其进行子集化?

问题描述 投票:0回答:1

我正在尝试创建一个 python 函数,该函数使用 xarray 打开远程数据集(在 opendap 服务器中)并自动延迟创建新变量。一个用例是当 u 和 v 分量可用时计算幅度和方向,例如:

import xarray as xr
import dask
import dask.array as da


@dask.delayed
def uv2mag(u, v):
    return (u**2 + v**2)**0.5


@dask.delayed
def uv2dir(u, v):
    return np.rad2deg(np.arctan2(u, v))


def open_dataset(*args, **kwargs) -> xr.Dataset:

    uv = kwargs.pop("uv", None)

    ds = xr.open_dataset(*args, **kwargs)

    if uv:

        uvar, vvar = uv

        ds["magnitude"] = (
            ds[uvar].dims,
            da.from_delayed(uv2mag(ds[uvar], ds[vvar]),
                            ds[uvar].shape,
                            dtype=ds[uvar].dtype),
            {"long_name": "magnitude"},
        )

        ds["direction"] = (
            ds[uvar].dims,
            da.from_delayed(uv2dir(ds[uvar], ds[vvar]),
                            ds[uvar].shape,
                            dtype=ds[uvar].dtype),
            {"long_name": "direction"},
        )

    return ds


url = "https://tds.hycom.org/thredds/dodsC/FMRC_ESPC-D-V02_uv3z/FMRC_ESPC-D-V02_uv3z_best.ncd"
uvar = "water_u"
vvar = "water_v"

ds = open_dataset(url, drop_variables="tau", uv=[uvar, vvar])

乍一看,一切似乎都运行良好,并且创建了新的幅度和方向变量。

<xarray.Dataset>
Dimensions:      (depth: 40, lat: 4251, lon: 4500, time: 121)
Coordinates:
  * depth        (depth) float64 0.0 2.0 4.0 6.0 ... 2.5e+03 3e+03 4e+03 5e+03
  * lat          (lat) float64 -80.0 -79.96 -79.92 -79.88 ... 89.92 89.96 90.0
  * lon          (lon) float64 0.0 0.07996 0.16 0.24 ... 359.7 359.8 359.8 359.9
  * time         (time) datetime64[ns] 2024-09-29T12:00:00 ... 2024-10-14T12:...
    time_run     (time) datetime64[ns] ...
Data variables:
    time_offset  (time) datetime64[ns] ...
    water_u      (time, depth, lat, lon) float32 ...
    water_v      (time, depth, lat, lon) float32 ...
    magnitude    (time, depth, lat, lon) float32 dask.array<chunksize=(121, 40, 4251, 4500), meta=np.ndarray>
    direction    (time, depth, lat, lon) float32 dask.array<chunksize=(121, 40, 4251, 4500), meta=np.ndarray>
Attributes: (12/22)
    classification_level:      
    distribution_statement:    Approved for public release; distribution unli...
    downgrade_date:            not applicable
    classification_authority:  not applicable
    institution:               Fleet Numerical Meteorology and Oceanography C...
    source:                    HYCOM archive file, GLBz0.04
                       ...
    time_origin:               2024-10-05 12:00:00
    _CoordSysBuilder:          ucar.nc2.dataset.conv.CF1Convention
    cdm_data_type:             GRID
    featureType:               GRID
    location:                  Proto fmrc:FMRC_ESPC-D-V02_uv3z
    history:                   FMRC Best Dataset

当我尝试获取数据的一个非常小的子集时,问题就出现了。 “真实”变量立即返回数据,例如:

>>> ds.isel(time=slice(0, 5), depth=0, lat=1000, lon=1000)["water_u"].values
array([0.27400002, 0.23600002, 0.12      , 0.108     , 0.24400002],
      dtype=float32)

但是,当我尝试从惰性变量获取子集时,由于尝试将整个 4D 矩阵加载到内存中,而不仅仅是子集,因此会引发错误。

>>> ds.isel(time=slice(0, 5), depth=0, lat=1000, lon=1000)["magnitude"].values
MemoryError: Unable to allocate 345. GiB for an array with shape (121, 40, 4251, 4500) and data type float32

有人建议可以修多薄吗?

谢谢你。

我的期望是

isel
执行的子集也将应用于惰性变量,并且内存中的加载将与“真实”变量一样快。

python dask python-xarray opendap thredds
1个回答
0
投票

按照@guillaume-eb的建议,我删除了dask.delayed选项并仅使用简单的Dask数组。该代码现在似乎工作得很好。谢谢你。

import datetime
import numpy as np
import xarray as xr
import dask.array as dask_array


def uv2mag(u: xr.DataArray,
           v: xr.DataArray
           ) -> dask_array:
    return (u.data**2 + v.data**2)**0.5


def uv2dir(u: xr.DataArray,
           v: xr.DataArray
           ) -> dask_array:
    return np.rad2deg(np.arctan2(u.data, v.data))


def open_dataset(*args, **kwargs) -> xr.Dataset:

    uv = kwargs.pop("uv", None)

    ds = xr.open_dataset(*args, **kwargs)

    if uv:

        uvar, vvar = uv

        ds["magnitude"] = (
            ds[uvar].dims,
            uv2mag(ds[uvar], ds[vvar]),
            {"long_name": "magnitude"},
        )

        ds["direction"] = (
            ds[uvar].dims,
            uv2dir(ds[uvar], ds[vvar]),
            {"long_name": "direction"},
        )

    return ds


url = "https://tds.hycom.org/thredds/dodsC/FMRC_ESPC-D-V02_uv3z/FMRC_ESPC-D-V02_uv3z_best.ncd"
uvar = "water_u"
vvar = "water_v"

chunks = {"time": 10,
          "depth": 10,
          "latitude": 1,
          "longitude": 1}

ds = open_dataset(url, drop_variables="tau", chunks=10, uv=[uvar, vvar])
ds2 = ds.isel(time=slice(0, 5), depth=0, lat=1000, lon=1000)
ds2.load()
© www.soinside.com 2019 - 2024. All rights reserved.