聚合多个 Parquet 文件时,使用 Polars 在每个文件的基础上添加日期列

问题描述 投票:0回答:1

我有大量的 Parquet 数据文件,我可以通过 Polars 很好地加入和聚合它们,执行类似的操作(注意文件名中的 glob):

(
    pl.scan_parquet('data/data-16828*.parquet')
    .group_by('type_id', 'location_id')
    .agg(
        pl.min('n').alias('n_min'),
        pl.max('n').alias('n_max')
    )
    .collect()
)

每个文件都是每五分钟运行一次的脚本的输出,我的目标是利用它们创建一个时间序列数据帧。有一个类型为

date
datetime[μs, UTC]
列。但是,我发现该列的值在单个文件中并不相等,而是反映了运行期间创建行的确切时间。因此,
date
列对于分组来说毫无用处。

按照我的看法,我可能应该添加一个新列,并在每个文件的基础上使用第一行的

date
值填充它。是否可以使用 Polars 的惰性 API 来实现此目的,还是我必须在使用 Polars 运行聚合之前先修复文件?

请注意,我需要使用惰性 API,因为数据集比内存大得多。

parquet python-polars
1个回答
1
投票

lazyframe 没有任何关于文件来源的信息。因此,您需要将迭代移出极坐标,以便您可以自己将文件信息提供给惰性框架。

类似这样的:

lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
    if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue 
    lazydf.append((
        pl.scan_parquet(myfile)
        .group_by('type_id', 'location_id')
        .agg(
            pl.min('n').alias('n_min'),
            pl.max('n').alias('n_max')
        )
        .with_columns(source_file=pl.lit(myfile.name))
    ))
pl.concat(lazydf)

这不会捕获第一行方面,因为您需要更改 group_by/agg 模型并使用窗口函数,以便每一列都有自己的分组,如下所示:

lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
    if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue 
    lazydf.append((
        pl.scan_parquet(myfile)
        .select('type_id',
                'location_id',
                n_min=pl.col('n').min().over('type_id','location_id'),
                n_max=pl.col('n').max().over('type_id','location_id'),
                date=pl.col('date').first())
        .unique(subset=['type_id','location_id','n_min','n_max','date'])
    ))
pl.concat(lazydf)
© www.soinside.com 2019 - 2024. All rights reserved.