我有一个以自定义格式存储的大型分片数据集,这将大大受益于
dask.dataframe.from_delayed
但是,在尝试保留结果数据帧时,我看到了奇怪的行为:
def load(file):
# Just an example...Actual loading code is more complex.
return pd.read_parquet(file)
filenames = ['my_file']
df = dd.from_delayed([delayed(load)(f) for f in filenames])
df = df.persist()
print(df.count().compute())
这会导致两个连续的“加载”任务,每个任务从头开始从网络加载数据:一次在调用 .persist() 时,一次在持久数据帧上运行计算时。
我期望只有一个“加载”任务,然后计算将在持久数据帧上进行。
我确认了
df = dd.read_parquet('my_file')
df = df.persist()
print(df.count().compute())
仅正确安排一个 read_parquet 任务,因此数据仅从网络加载一次。
此问题是否有解决方法,以确保调用 .persist 后,不会从网络重新下载数据?
当您创建 dask 数据框时,dask 需要知道列和类型,以便它可以推断您可能对其执行的进一步惰性操作。在
from_delayed
中,你有机会用 meta=
参数来提供这个东西。如果您不提供它,dask将加载第一个分区来推断它,然后丢弃该数据 - 正如您所说,这可能会很昂贵。
所以,答案是:提供
meta=
;要么您可以知道它,要么找到一种方法仅加载第一个分区的一部分以在运行时推断它。