使用“dask.delayed”将大于内存的xarray数据集存储到zarr,而不会耗尽内存

问题描述 投票:0回答:1

我正在尝试使用dask在xarray中进行大于内存的处理。具体来说,我正在尝试:

  • 按时间连接多个 NetCDF 文件(在同一地理网格上,相同的变量)
  • 将它们重新网格到不同的网格
  • 将它们保存为
    zarr
    存储,分块,以便按时只有一个块。

(生成的 zarr 存储随后将用于进一步分析,这是沿着时间维度完成的,因此需要沿着时间进行分块)。

但是,我在设置工作流程时遇到了麻烦,而又不会导致

ds.to_zarr()
调用中内存使用量激增。

我正在尝试遵循 Dask 最佳实践(尤其是this)。工作流程的简化版本是:

import xarray as xr
import numpy as np
import xesmf as xe
from distributed import Client

# Start dask client
client = Client()
display(client)

@dask.delayed
def load(fn_list):
   ds = xr.open_mfdataset(fn_list)
   return ds

@dask.delayed
def process(ds):
   # Do something to dataset, e.g., regridding
   ref_grid = xr.Dataset(coords = {'lat':np.arange(-89.5,89.6),
                                   'lon':np.arange(-179.5,179.6)})
   rgrd = xe.Regridder(ds,ref_grid,'conservative')
   
   ds = rgrd(ds)
   return ds

def workflow(fn_list):
   ds = load(fn_list)
   
   ds = process(ds)

   # Rechunk
   ds = ds.chunk({'time':-1,'lat':12,'lon':12})

   delayed = dask.delayed(ds.to_zarr)('test.zarr')
   return delayed

out = dask.compute(workflow)
dask.compute(out)

根据我通过研究这个问题收集到的信息,任务图的设置方式会导致当

dask.compute()
到达
.to_zarr()
调用时加载整个数组并将其发送给一个工作人员。

我想我的主要问题是 - 为什么

.to_zarr()
调用需要内存中的所有内容/如何设置它而不需要内存中的所有内容?

版本:

zarr == 2.18.3
xarray == 2024.9.0
dask == 2024.9.1
python dask python-xarray zarr
1个回答
0
投票

对此进行了精彩的讨论这里

© www.soinside.com 2019 - 2024. All rights reserved.