我正在 python 上处理数千个 csv/feather 文件(每个文件包含 ~xxx)。我最初使用 pandas 来完成任务,但需要很长时间,因此我尝试使用 Polars。
注意:我的代码的 pandas 版本与下面的类似,它工作得很好,但需要很长时间才能完成。
读取/扫描所有 csv/feather 文件,然后将其重新写入为一个镶木地板文件。
但在执行此操作之前,我需要读取所有 csv/feather 文件,然后将其连接到 1 个数据帧中。
问题:当我在 VSCode jupyter 笔记本上运行代码时,但内核不断崩溃
Canceled future for execute_request message before replies were done
The Kernel crashed while executing code in the the current cell or a previous cell. Please review the code in the cell(s) to identify a possible cause of the failure. Click here for more info. View Jupyter log for further details.
问题:如何有效地连接所有文件而不导致内核崩溃?
all_tables = glob.glob(os.path.join(EXPORT_TABLE_FOLDER, "*.{}".format(table_format)))
parquet_name = "simplified_all_spatial_join_data_{}_p0.parquet".format(location_name)
parquet_path = os.path.join(EXPORT_TABLE_FOLDER, parquet_name)
# read the first file
if table_format == 'csv':
temp_all_spatial_join_data = pl.scan_csv(all_tables[0], infer_schema_length=0)
else:
temp_all_spatial_join_data = pl.scan_ipc(all_tables[0], infer_schema_length=0)
# read the rest of the files
if not os.path.exists(parquet_path):
# clone the first scan as a placeholder (to be concatenated later)
collected_temp_all_spatial_join_data = temp_all_spatial_join_data.collect().clone()
# iterate through the files
for table, iteration in tqdm(zip(all_tables[1:], range(len(all_tables[1:]))), total = len(all_tables[1:])):
if table_format == 'csv':
temp = pl.scan_csv(table, infer_schema_length=0)
else:
temp = pl.scan_ipc(table, infer_schema_length=0)
temp_all_spatial_join_data = pl.concat([temp_all_spatial_join_data, temp])
# each 25th iteration, collect the lazyframe as dataframe,
if iteration % 25 == 0:
collected_temp_all_spatial_join_data = pl.concat([
collected_temp_all_spatial_join_data,
temp_all_spatial_join_data.collect()
]).unique()
# then re-assign the temp_all_spatial_join_data as the current file being scanned
if table_format == 'csv':
temp_all_spatial_join_data = pl.scan_csv(table, infer_schema_length=0)
else:
temp_all_spatial_join_data = pl.scan_ipc(table, infer_schema_length=0)
else:
print ('WARNING: This file already exists!\nSkipping the process')
# write the concatenated files into a parquet file
collected_temp_all_spatial_join_data.write_parquet(parquet_path)
我不太确定,但我怀疑这与用于存储 LazyFrames 和查询计划的内存有关。
if iteration % 25 == 0:
部分是我努力最小化用于存储查询计划的内存,方法是将它们划分为 25 个文件的块,将其收集到 DataFrame 中,然后重置查询计划。它适用于较少数量的文件(最多数百个),但是当文件大小达到数千时,即使我将块变小,内核也会不断崩溃。
一些初步印象:
concat
列表即可。 您可以有一个步骤来检查 estimated_size
,这样如果它崩溃了,您就知道要设置断点的大小。所以喜欢:
all_tables = glob.glob(os.path.join(EXPORT_TABLE_FOLDER, "*.{}".format(table_format)))
df=[]
chunk=1
accumsize=0
sizethreshold = 1_000_000_000_000_000 # set this to something smaller than the last print after it crashes
for tablefile in all_tables:
if tablefile.format == 'csv':
temp = pl.read_csv(tablefile, infer_schema_length=0)
else:
temp = pl.read_ipc(tablefile, infer_schema_length=0)
df.append(temp)
accumsize += temp.estimated_size()
if accumsize >= sizethreshold:
df=pl.concat(df)
df.write_parquet(parquet_path.replace(".parquet", f"{chunk}.parquet"))
chunk+=1
df=[]
accumsize=0
else:
print(accumsize)
df=pl.concat(df)
df.write_parquet(parquet_path.replace(".parquet", f"{chunk}.parquet"))
来自极地文档:
重新分块
在串联之前,我们有两个数据帧 df1 和 df2。 df1 和 df2 中的每一列都位于内存中的一个或多个块中。默认情况下,在串联期间,每列中的块不会变得连续。这使得连接操作更快并且消耗更少的内存,但它可能会减慢未来操作的速度,而这些操作将受益于将数据存储在连续内存中。将碎片块复制到单个新块的过程称为重新分块。重新分块是一项昂贵的操作。在版本 0.20.26 之前,默认情况下是执行重新分块,但在新版本中,默认情况下不执行。如果您确实希望 Polars 对连接的 DataFrame 重新分块,请在连接时指定 rechunk = True。
使用 concat 的表达式中需要将 rechunk 参数设置为 True,例如:some_df = pl.concat([df1, df2], rechunk=True)