我正在尝试使用dask在xarray中进行大于内存的处理。具体来说,我正在尝试:
zarr
存储,分块,以便按时只有一个块。(生成的 zarr 存储随后将用于进一步分析,这是沿着时间维度完成的,因此需要沿着时间进行分块)。
但是,我在设置工作流程时遇到了麻烦,而又不会导致
ds.to_zarr()
调用中内存使用量激增。
我正在尝试遵循 Dask 最佳实践(尤其是this)。工作流程的简化版本是:
import xarray as xr
import numpy as np
import xesmf as xe
from distributed import Client
# Start dask client
client = Client()
display(client)
@dask.delayed
def load(fn_list):
ds = xr.open_mfdataset(fn_list)
return ds
@dask.delayed
def process(ds):
# Do something to dataset, e.g., regridding
ref_grid = xr.Dataset(coords = {'lat':np.arange(-89.5,89.6),
'lon':np.arange(-179.5,179.6)})
rgrd = xe.Regridder(ds,ref_grid,'conservative')
ds = rgrd(ds)
return ds
def workflow(fn_list):
ds = load(fn_list)
ds = process(ds)
# Rechunk
ds = ds.chunk({'time':-1,'lat':12,'lon':12})
delayed = dask.delayed(ds.to_zarr)('test.zarr')
return delayed
out = dask.compute(workflow)
dask.compute(out)
根据我通过研究这个问题收集到的信息,任务图的设置方式会导致当
dask.compute()
到达 .to_zarr()
调用时加载整个数组并将其发送给一个工作人员。
我想我的主要问题是 - 为什么
.to_zarr()
调用需要内存中的所有内容/如何设置它而不需要内存中的所有内容?
版本:
zarr == 2.18.3
xarray == 2024.9.0
dask == 2024.9.1
对此进行了精彩的讨论这里。