我有大量的 Parquet 数据文件,我可以通过 Polars 很好地加入和聚合它们,执行类似的操作(注意文件名中的 glob):
(
pl.scan_parquet('data/data-16828*.parquet')
.group_by('type_id', 'location_id')
.agg(
pl.min('n').alias('n_min'),
pl.max('n').alias('n_max')
)
.collect()
)
每个文件都是每五分钟运行一次的脚本的输出,我的目标是利用它们创建一个时间序列数据帧。有一个类型为
date
的 datetime[μs, UTC]
列。但是,我发现该列的值在单个文件中并不相等,而是反映了运行期间创建行的确切时间。因此,date
列对于分组来说毫无用处。
按照我的看法,我可能应该添加一个新列,并在每个文件的基础上使用第一行的
date
值填充它。是否可以使用 Polars 的惰性 API 来实现此目的,还是我必须在使用 Polars 运行聚合之前先修复文件?
请注意,我需要使用惰性 API,因为数据集比内存大得多。
lazyframe 没有任何关于文件来源的信息。因此,您需要将迭代移出极坐标,以便您可以自己将文件信息提供给惰性框架。
类似这样的:
lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue
lazydf.append((
pl.scan_parquet(myfile)
.group_by('type_id', 'location_id')
.agg(
pl.min('n').alias('n_min'),
pl.max('n').alias('n_max')
)
.with_columns(source_file=pl.lit(myfile.name))
))
pl.concat(lazydf)
这不会捕获第一行方面,因为您需要更改 group_by/agg 模型并使用窗口函数,以便每一列都有自己的分组,如下所示:
lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue
lazydf.append((
pl.scan_parquet(myfile)
.select('type_id',
'location_id',
n_min=pl.col('n').min().over('type_id','location_id'),
n_max=pl.col('n').max().over('type_id','location_id'),
date=pl.col('date').first())
.unique(subset=['type_id','location_id','n_min','n_max','date'])
))
pl.concat(lazydf)