如何加快使用 Polars/pandas 在 Python 中的数据帧列表上应用数据清理和转换的计算时间?

问题描述 投票:0回答:2

如何以非常快的计算时间迭代 python 中的大量 Polars/Pandas 数据帧,同时对每个数据帧执行数据清理/转换?

我的问题:我有一个巨大的 csv 文件列表(例如~1K),每个文件大约 20MB。我已将每个 csv 文件转换为数据帧(我尝试了 pandas 和 Polars 只是为了查看计算时间的差异)并为每个数据帧的数据清理应用一些转换。

执行此操作的有效方法是什么,因为目前我的总计算时间(如果我使用列表理解或映射甚至 for 循环)约为 3 分钟将所有 ~1K csv 文件转换为数据帧 & ~2 分钟对每个数据帧进行转换(即 2x对于所有 1K 数据帧,1K = 2K 分钟)? (我使用的是Python3.11)

以下是我迄今为止尝试过的更多详细信息。

我的 csv 片段(这里仅提到几行和几列以给出一个想法)转换为数据框看起来像这样(我的实际 csv 每个文件有约 10K 行和 400 列)

df = pl.from_repr("""
┌───────┬───────────────┬────────────────────┐
│ Index ┆ A             ┆ B                  │
│ ---   ┆ ---           ┆ ---                │
│ i64   ┆ str           ┆ str                │
╞═══════╪═══════════════╪════════════════════╡
│ 0     ┆ 203 X 345 457 ┆ 346 X X 457 45 46  │
│ 0     ┆ 11 22 44 890  ┆ 22 33 44 88 90 100 │
│ 0     ┆ X X 456 44 90 ┆ null               │
│ 1     ┆ null          ┆ 33 456 99 10 10 11 │
│ 1     ┆ null          ┆ null               │
└───────┴───────────────┴────────────────────┘
""")

所以基本上我想把它们改造成这样的东西:

┌───────┬─────────────────────────────────┬────────────────────────────────────────┐
│ Index ┆ A                               ┆ B                                      │
│ ---   ┆ ---                             ┆ ---                                    │
│ i64   ┆ list[f64]                       ┆ list[f64]                              │
╞═══════╪═════════════════════════════════╪════════════════════════════════════════╡
│ 0     ┆ [203.0, null, 345.0, 457.0]     ┆ [346.0, null, null, 457.0, 45.0, 46.0] │
│ 0     ┆ [11.0, 22.0, 44.0, 890.0]       ┆ [22.0, 33.0, 44.0, 88.0, 90.0, 100.0]  │
│ 0     ┆ [null, null, 456.0, 44.0, 90.0] ┆ null                                   │
│ 1     ┆ null                            ┆ [33.0, 456.0, 99.0, 10.0, 10.0, 11.0]  │
│ 1     ┆ null                            ┆ null                                   │
└───────┴─────────────────────────────────┴────────────────────────────────────────┘

我的代码(这是在极坐标中)到目前为止看起来像这样:

import polars as pl  

def convertdf(input):  
    return pl.read_csv(input)  

def applycast(df,col):  
    dfcast     = df.explode(col).select('Index',pl.col(col)).with_columns(pl.col(col).is_not_null()).then(pl.col(col).cast(pl.Float64,strict=False))).group_by('Index').agg(pl.col(col)).sort('Index')  
    df = df.replace(col,    dfcast[col]) 
    return df  

def datatransform(df):  
   df =     df.select(df.columns[1:]).select(pl.col(pl.String).str.split(' ')) 
   df = df.pipe(lambda df : reduce(lambda df,col: applycast(df,col), df.columns,df))  
   return df  

csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
df_files = list(map(convertdf, csvfiles)) #Time taken: 3mins 
dftransformedfiles = [df.pipe(datatransform) for df in df_files] #Time taken: ~2K mins 

基本上,如您所见,我使用列表理解来循环每个 csv 文件。有什么办法可以并行执行吗?

有没有办法一次性将“applycast()”函数应用于所有列?因为,目前我正在遍历每一列,在我看来,这就是它花费更长时间的原因。虽然我每列的内容有所不同,但数据类型是List[str],需要转换为List[f64]。

我尝试在应用 datatransform() 之前连接所有数据帧,但连接花费了更长的时间。我想过使用“Ray”API进行并行执行,但它不支持最新的Python3.11。如何减少计算时间,或者迭代多列或多数据帧/csv 列表的最佳方法是什么?

performance parallel-processing python-polars ray python-3.11
2个回答
1
投票

我认为

pipe
一般来说,UDF 肯定会很慢。这里不需要
replace

我会使用

list.eval
执行您正在寻找的操作,它将列中的每个列表视为自己的系列:

df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
df.with_columns(
    pl.col(pl.String)
    .str.split(" ")
    .list.eval(pl.element().cast(pl.Int64, strict=False))
)
shape: (2, 3)
┌─────┬─────────────────┬─────────────────┐
│ x   ┆ y               ┆ z               │
│ --- ┆ ---             ┆ ---             │
│ i64 ┆ list[i64]       ┆ list[i64]       │
╞═════╪═════════════════╪═════════════════╡
│ 0   ┆ [3, null, null] ┆ [1, null, 2]    │
│ 1   ┆ null            ┆ [null, null, 5] │
└─────┴─────────────────┴─────────────────┘

因此将

with_columns
操作包装到一个方法中,看看加速是多少:

def data_load_transform(filename):
  return pd.read_csv(filename).with_columns(
      pl.col(pl.String)
      .str.split(" ")
      .list.eval(pl.element().cast(pl.Int64, strict=False))
  )

csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]

如果您需要进一步加速,最后的列表理解可以通过各种多处理方式并行完成。


1
投票

看起来转变是:

(df.with_columns(
   pl.col(pl.Utf8).str.strip("'").str.split(" ")
     .cast(pl.List(pl.Float64))
)
shape: (5, 3)
┌──────────────────────┬────────────────────────┬───────────────────────┐
│ Index (Dtype: [i64]) ┆ A (Dtype: [str])       ┆ B (Dtype: [str])      │
│ ---                  ┆ ---                    ┆ ---                   │
│ i64                  ┆ list[f64]              ┆ list[f64]             │
╞══════════════════════╪════════════════════════╪═══════════════════════╡
│ 0                    ┆ [203.0, null, … 457.0] ┆ [346.0, null, … 46.0] │
│ 0                    ┆ [11.0, 22.0, … 890.0]  ┆ [22.0, 33.0, … 100.0] │
│ 0                    ┆ [null, null, … 90.0]   ┆ null                  │
│ 1                    ┆ null                   ┆ [33.0, 456.0, … 11.0] │
│ 1                    ┆ null                   ┆ null                  │
└──────────────────────┴────────────────────────┴───────────────────────┘

对于从多个 csv 文件创建数据框,您可以使用

.scan_csv
而不是
.read_csv

df = pl.concat(pl.scan_csv(file) for file in csvfiles)

或者,如果您的文件名遵循全局模式:

df = pl.scan_csv("input*.csv")

df = df.with_columns(
   pl.col(pl.Utf8).str.strip("'").str.split(" ")
     .cast(pl.List(pl.Float64))
)

.scan_csv
返回一个 LazyFrame 时,您可以使用
.collect()
来生成结果。

© www.soinside.com 2019 - 2024. All rights reserved.