使用极坐标排序大于内存文件的内存问题

问题描述 投票:0回答:1

大家

我在使用 Skin_csv 命令对大于内存的 LazyFrame 与极坐标进行排序时遇到内存问题。

我正在开发一个引擎,它应该能够合并和排序多个大型数据集,这些数据集以不同的 CSV 文件的形式出现。

我正在处理两个 6GB 文件进行合并和排序。我从 8GB RAM 的 Pod 开始,但即使使用 32GB,应用程序也会崩溃。

注意:这些文件不是“传统”csv 文件,从某种意义上说,它们在其组成上具有自定义“标题”和“预告片”记录。每行由“记录代码”(前 2 位数字)标识。不同的记录类型可能有不同数量的字段,因此我不能简单地用普通的“scan_csv”+分隔符读取整个文件。

以下是其中一个文件的示例:

00;XPTO;99999991000197;20240905;130444;000001;20240905;130444;000001;20240901;0000
01;99900000001;F;1;0;000000321
01;00000000001;F;2;0;000000123
01;77700000003;F;0;0;000000999
01;22200000004;F;0;0;000000999
01;12300000004;F;0;0;000000999
99;00000000005;

我的代码的作用:

  1. 创建一个包含所有要合并部分的 LazyFrame
  2. “按原样”创建一个包含原始行的列,因为我不应该在输出文件上以任何方式更改其格式或内容。
  3. 创建一个新列,使用提供的分隔符 (;) 拆分列表中的原始行。
  4. 从数据帧中过滤不需要的记录代码(通过过滤列表第一个字段的值)
  5. 通过读取“列表”列上的索引,为“可排序”字段创建新列。
  6. 使用 pl.sink_csv 将数据帧排序到新文件中
import polars as pl
import os

# Polars tmp dir should be set to /data since /tmp does not have enough space
os.environ["POLARS_TEMP_DIR"] = "/data/testes/merger/tmp/"
pl.Config.set_verbose(True)  

valid_record_codes = ["01", "02", "05", "06"]
sort_column_indexes = [1, 0]

ORIGINAL_ROW_COLUMN = "original_row"
ROW_AS_LIST_COLUMN = "row_as_list"
RECORD_CODE_COLUMN = "record_code"
SEPARATOR = ";"

# Read the input files
lf = pl.scan_csv(
    "./part_*",
    separator=chr(0000),
    has_header=False,
    new_columns=[ORIGINAL_ROW_COLUMN],
)

# Add a column on the dataframe for the record_code. 
# As for now, the record code is always the first field of the dataframe
lf = lf.select(ORIGINAL_ROW_COLUMN).with_columns(
    pl.col(ORIGINAL_ROW_COLUMN).str.split(SEPARATOR).alias(ROW_AS_LIST_COLUMN)
)

# Eliminate undesired records from the dataframe
lf = lf.with_columns(
    pl.col(ROW_AS_LIST_COLUMN).list.get(0).alias(RECORD_CODE_COLUMN)
)



lf = lf.filter(pl.col(RECORD_CODE_COLUMN).is_in(valid_record_codes)).select(
    pl.col(ORIGINAL_ROW_COLUMN), pl.col(ROW_AS_LIST_COLUMN)
)

sort_columns = list()

# Add the sortable columns to the LazyFrame
for sort_column in sort_column_indexes:
    column_name = f"column_{sort_column}"
    lf = lf.with_columns(
            pl.col(ROW_AS_LIST_COLUMN)
            .list.get(sort_column)
            .alias(column_name)
        )
    sort_columns.append(column_name)

# Sort the dataframe
lf = lf.sort(sort_columns).select(ORIGINAL_ROW_COLUMN)

# Write the file
lf.sink_csv("output.csv", include_header=False)

该代码适用于小文件。但对于较大的文件,极性开始消耗大量内存,直到 python 进程终止。

以下是代码的执行输出:

>>> lf.sink_csv("output.csv", include_header=False)
RUN STREAMING PIPELINE
[csv -> hstack -> hstack -> filter -> fast_projection -> hstack -> hstack -> sort_multiple -> fast_projection -> parquet_sink]
STREAMING CHUNK SIZE: 600000 rows
OOC sort started
Temporary directory path in use: /data/testes/merger/tmp/
STREAMING CHUNK SIZE: 600000 rows
finished sinking into OOC sort in 539.231142825s
full file dump of OOC sort took 539.631121035s
spill size: 0 mb
processing 1375 files
Killed

我开始观察这个 Pod 上的内存和磁盘使用情况,这就是我观察到的:

  • 在消息“STREAMING CHUNK SIZE: 600000 rows”之后,内存使用量开始增长,直到达到 ~20GB 的峰值。
  • 然后它开始创建临时文件。在这一步中,内存使用量急剧下降(<600MB). The tmp files sums up to ~87GB on the disk (!!!)
  • 创建所有临时文件后(在“处理 1375 个文件”消息之后),我可以观察到 tmp 目录大小减小,内存使用量再次增长。
  • 当内存达到峰值 ~32GB 时,会出现“Killed”消息,此时 tmp 目录的容量约为 70GB。

我已经尝试减小 Polars 配置中的块大小(从 600k 到 50k)。这给了我更多的执行时间,但错误仍然发生。

我还尝试将可排序字段转换为 INT64,但在这种情况下,程序甚至在创建所有要排序的部分文件之前就终止了。

还有其他配置参数可以调整以优化内存使用吗?

python dataframe sorting python-polars
1个回答
0
投票

我想你会想完全避免使用列表。

例如,您可以使用

.str.extract()
并使用正则表达式在索引 N 处提取。

>>> pl.select(pl.lit("01;99900000001;F;1;0;000000321").str.extract(r"[^;]+;([^;]+)")).item()
'99900000001' # index 1

如果我用这种方法替换列表实现:

lf = pl.scan_csv(
    "./part_*",
    separator=chr(0000),
    has_header=False,
    new_columns=[ORIGINAL_ROW_COLUMN],
)

lf = lf.filter(
    pl.col(ORIGINAL_ROW_COLUMN).str.extract(rf"([^{SEPARATOR}]+)").is_in(valid_record_codes)
)

lf = lf.with_columns(
    pl.col(ORIGINAL_ROW_COLUMN).str.extract(
        SEPARATOR.join(rf"[^{SEPARATOR}]+" * (index)) # build regex for index N
        + 
        SEPARATOR
        + 
        rf"([^{SEPARATOR}]+)"
    )
    .alias(f"column_{index}") for index in sort_column_indexes
)

(lf.sort(r"^column_\d+$")
   .drop(r"^column_\d+$")
   .sink_csv("output.csv", include_header=False)
)

并将输入文件增加到 20_000_000 行 - 其峰值为 5GB RAM。

原始列表版本的 RAM 峰值为 10GB。

这可能取决于平台/规格,但似乎表明它可能是一种改进的方法。

© www.soinside.com 2019 - 2024. All rights reserved.