大家
我在使用 Skin_csv 命令对大于内存的 LazyFrame 与极坐标进行排序时遇到内存问题。
我正在开发一个引擎,它应该能够合并和排序多个大型数据集,这些数据集以不同的 CSV 文件的形式出现。
我正在处理两个 6GB 文件进行合并和排序。我从 8GB RAM 的 Pod 开始,但即使使用 32GB,应用程序也会崩溃。
注意:这些文件不是“传统”csv 文件,从某种意义上说,它们在其组成上具有自定义“标题”和“预告片”记录。每行由“记录代码”(前 2 位数字)标识。不同的记录类型可能有不同数量的字段,因此我不能简单地用普通的“scan_csv”+分隔符读取整个文件。
以下是其中一个文件的示例:
00;XPTO;99999991000197;20240905;130444;000001;20240905;130444;000001;20240901;0000
01;99900000001;F;1;0;000000321
01;00000000001;F;2;0;000000123
01;77700000003;F;0;0;000000999
01;22200000004;F;0;0;000000999
01;12300000004;F;0;0;000000999
99;00000000005;
我的代码的作用:
import polars as pl
import os
# Polars tmp dir should be set to /data since /tmp does not have enough space
os.environ["POLARS_TEMP_DIR"] = "/data/testes/merger/tmp/"
pl.Config.set_verbose(True)
valid_record_codes = ["01", "02", "05", "06"]
sort_column_indexes = [1, 0]
ORIGINAL_ROW_COLUMN = "original_row"
ROW_AS_LIST_COLUMN = "row_as_list"
RECORD_CODE_COLUMN = "record_code"
SEPARATOR = ";"
# Read the input files
lf = pl.scan_csv(
"./part_*",
separator=chr(0000),
has_header=False,
new_columns=[ORIGINAL_ROW_COLUMN],
)
# Add a column on the dataframe for the record_code.
# As for now, the record code is always the first field of the dataframe
lf = lf.select(ORIGINAL_ROW_COLUMN).with_columns(
pl.col(ORIGINAL_ROW_COLUMN).str.split(SEPARATOR).alias(ROW_AS_LIST_COLUMN)
)
# Eliminate undesired records from the dataframe
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN).list.get(0).alias(RECORD_CODE_COLUMN)
)
lf = lf.filter(pl.col(RECORD_CODE_COLUMN).is_in(valid_record_codes)).select(
pl.col(ORIGINAL_ROW_COLUMN), pl.col(ROW_AS_LIST_COLUMN)
)
sort_columns = list()
# Add the sortable columns to the LazyFrame
for sort_column in sort_column_indexes:
column_name = f"column_{sort_column}"
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN)
.list.get(sort_column)
.alias(column_name)
)
sort_columns.append(column_name)
# Sort the dataframe
lf = lf.sort(sort_columns).select(ORIGINAL_ROW_COLUMN)
# Write the file
lf.sink_csv("output.csv", include_header=False)
该代码适用于小文件。但对于较大的文件,极性开始消耗大量内存,直到 python 进程终止。
以下是代码的执行输出:
>>> lf.sink_csv("output.csv", include_header=False)
RUN STREAMING PIPELINE
[csv -> hstack -> hstack -> filter -> fast_projection -> hstack -> hstack -> sort_multiple -> fast_projection -> parquet_sink]
STREAMING CHUNK SIZE: 600000 rows
OOC sort started
Temporary directory path in use: /data/testes/merger/tmp/
STREAMING CHUNK SIZE: 600000 rows
finished sinking into OOC sort in 539.231142825s
full file dump of OOC sort took 539.631121035s
spill size: 0 mb
processing 1375 files
Killed
我开始观察这个 Pod 上的内存和磁盘使用情况,这就是我观察到的:
我已经尝试减小 Polars 配置中的块大小(从 600k 到 50k)。这给了我更多的执行时间,但错误仍然发生。
我还尝试将可排序字段转换为 INT64,但在这种情况下,程序甚至在创建所有要排序的部分文件之前就终止了。
还有其他配置参数可以调整以优化内存使用吗?
我想你会想完全避免使用列表。
.str.extract()
并使用正则表达式在索引 N 处提取。
>>> pl.select(pl.lit("01;99900000001;F;1;0;000000321").str.extract(r"[^;]+;([^;]+)")).item()
'99900000001' # index 1
如果我用这种方法替换列表实现:
lf = pl.scan_csv(
"./part_*",
separator=chr(0000),
has_header=False,
new_columns=[ORIGINAL_ROW_COLUMN],
)
lf = lf.filter(
pl.col(ORIGINAL_ROW_COLUMN).str.extract(rf"([^{SEPARATOR}]+)").is_in(valid_record_codes)
)
lf = lf.with_columns(
pl.col(ORIGINAL_ROW_COLUMN).str.extract(
SEPARATOR.join(rf"[^{SEPARATOR}]+" * (index)) # build regex for index N
+
SEPARATOR
+
rf"([^{SEPARATOR}]+)"
)
.alias(f"column_{index}") for index in sort_column_indexes
)
(lf.sort(r"^column_\d+$")
.drop(r"^column_\d+$")
.sink_csv("output.csv", include_header=False)
)
并将输入文件增加到 20_000_000 行 - 其峰值为 5GB RAM。
原始列表版本的 RAM 峰值为 10GB。
这可能取决于平台/规格,但似乎表明它可能是一种改进的方法。