python Polars - 连接数千个 csv/feather 文件时内核不断崩溃

问题描述 投票:0回答:2

我正在 python 上处理数千个 csv/feather 文件(每个文件包含 ~xxx)。我最初使用 pandas 来完成任务,但需要很长时间,因此我尝试使用 Polars。
注意:我的代码的 pandas 版本与下面的类似,它工作得很好,但需要很长时间才能完成。

目标

读取/扫描所有 csv/feather 文件,然后将其重新写入为一个镶木地板文件。
但在执行此操作之前,我需要读取所有 csv/feather 文件,然后将其连接到 1 个数据帧中。

问题与疑问

问题:当我在 VSCode jupyter 笔记本上运行代码时,但内核不断崩溃

Canceled future for execute_request message before replies were done
The Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click here for more info. View Jupyter log for further details.

问题:如何有效地连接所有文件而不导致内核崩溃?

当前工作流程

  1. 从文件夹中读取或扫描文件
  2. 将文件连接到下一个文件
all_tables = glob.glob(os.path.join(EXPORT_TABLE_FOLDER, "*.{}".format(table_format)))

parquet_name = "simplified_all_spatial_join_data_{}_p0.parquet".format(location_name)
parquet_path = os.path.join(EXPORT_TABLE_FOLDER, parquet_name)

# read the first file
if table_format == 'csv':
    temp_all_spatial_join_data = pl.scan_csv(all_tables[0], infer_schema_length=0)
else:
    temp_all_spatial_join_data = pl.scan_ipc(all_tables[0], infer_schema_length=0)

# read the rest of the files
if not os.path.exists(parquet_path):
    # clone the first scan as a placeholder (to be concatenated later)
    collected_temp_all_spatial_join_data = temp_all_spatial_join_data.collect().clone()
    
    # iterate through the files
    for table, iteration in tqdm(zip(all_tables[1:], range(len(all_tables[1:]))), total = len(all_tables[1:])):
        if table_format == 'csv':
            temp = pl.scan_csv(table, infer_schema_length=0)
        else:
            temp = pl.scan_ipc(table, infer_schema_length=0)

        temp_all_spatial_join_data = pl.concat([temp_all_spatial_join_data, temp])

        # each 25th iteration, collect the lazyframe as dataframe,
        if iteration % 25 == 0:
            collected_temp_all_spatial_join_data = pl.concat([
                collected_temp_all_spatial_join_data,
                temp_all_spatial_join_data.collect()
                ]).unique()
            
            # then re-assign the temp_all_spatial_join_data as the current file being scanned
            if table_format == 'csv':
                temp_all_spatial_join_data = pl.scan_csv(table, infer_schema_length=0)
            else:
                temp_all_spatial_join_data = pl.scan_ipc(table, infer_schema_length=0)

else:
    print ('WARNING: This file already exists!\nSkipping the process')

# write the concatenated files into a parquet file
collected_temp_all_spatial_join_data.write_parquet(parquet_path)

我不太确定,但我怀疑这与用于存储 LazyFrames 和查询计划的内存有关。

if iteration % 25 == 0:
部分是我努力最小化用于存储查询计划的内存,方法是将它们划分为 25 个文件的块,将其收集到 DataFrame 中,然后重置查询计划。它适用于较少数量的文件(最多数百个),但是当文件大小达到数千时,即使我将块变小,内核也会不断崩溃。

python csv parquet python-polars feather
2个回答
1
投票

一些初步印象:

  1. 我不认为惰性框架对你有帮助。在某些时候,它们需要进入内存,并且由于您没有过滤或子集化列,所以我不确定节省的费用从何而来。
  2. 您不需要每一步都进行连接。 只需将每个文件添加到列表中并在末尾添加
    concat
    列表即可。 您可以有一个步骤来检查
    estimated_size
    ,这样如果它崩溃了,您就知道要设置断点的大小。

所以喜欢:

all_tables = glob.glob(os.path.join(EXPORT_TABLE_FOLDER, "*.{}".format(table_format)))
df=[]
chunk=1
accumsize=0
sizethreshold = 1_000_000_000_000_000 # set this to something smaller than the last print after it crashes
for tablefile in all_tables:
    if tablefile.format == 'csv':
        temp = pl.read_csv(tablefile, infer_schema_length=0)
    else:
        temp = pl.read_ipc(tablefile, infer_schema_length=0)
    df.append(temp)
    accumsize += temp.estimated_size()
    if accumsize >= sizethreshold:
        df=pl.concat(df)
        df.write_parquet(parquet_path.replace(".parquet", f"{chunk}.parquet"))
        chunk+=1
        df=[]
        accumsize=0
    else:
        print(accumsize)
df=pl.concat(df)
df.write_parquet(parquet_path.replace(".parquet", f"{chunk}.parquet"))
    

0
投票

来自极地文档:

重新分块

在串联之前,我们有两个数据帧 df1 和 df2。 df1 和 df2 中的每一列都位于内存中的一个或多个块中。默认情况下,在串联期间,每列中的块不会变得连续。这使得连接操作更快并且消耗更少的内存,但它可能会减慢未来操作的速度,而这些操作将受益于将数据存储在连续内存中。将碎片块复制到单个新块的过程称为重新分块。重新分块是一项昂贵的操作。在版本 0.20.26 之前,默认情况下是执行重新分块,但在新版本中,默认情况下不执行。如果您确实希望 Polars 对连接的 DataFrame 重新分块,请在连接时指定 rechunk = True。

使用 concat 的表达式中需要将 rechunk 参数设置为 True,例如:some_df = pl.concat([df1, df2], rechunk=True)

© www.soinside.com 2019 - 2024. All rights reserved.