如何以非常快的计算时间迭代 python 中的大量 Polars/Pandas 数据帧,同时对每个数据帧执行数据清理/转换?
我的问题:我有一个巨大的 csv 文件列表(例如~1K),每个文件大约 20MB。我已将每个 csv 文件转换为数据帧(我尝试了 pandas 和 Polars 只是为了查看计算时间的差异)并为每个数据帧的数据清理应用一些转换。
执行此操作的有效方法是什么,因为目前我的总计算时间(如果我使用列表理解或映射甚至 for 循环)约为 3 分钟将所有 ~1K csv 文件转换为数据帧 & ~2 分钟对每个数据帧进行转换(即 2x对于所有 1K 数据帧,1K = 2K 分钟)? (我使用的是Python3.11)
以下是我迄今为止尝试过的更多详细信息。
我的 csv 片段(这里仅提到几行和几列以给出一个想法)转换为数据框看起来像这样(我的实际 csv 每个文件有约 10K 行和 400 列)
df = pl.from_repr("""
┌───────┬───────────────┬────────────────────┐
│ Index ┆ A ┆ B │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ str │
╞═══════╪═══════════════╪════════════════════╡
│ 0 ┆ 203 X 345 457 ┆ 346 X X 457 45 46 │
│ 0 ┆ 11 22 44 890 ┆ 22 33 44 88 90 100 │
│ 0 ┆ X X 456 44 90 ┆ null │
│ 1 ┆ null ┆ 33 456 99 10 10 11 │
│ 1 ┆ null ┆ null │
└───────┴───────────────┴────────────────────┘
""")
所以基本上我想把它们改造成这样的东西:
┌───────┬─────────────────────────────────┬────────────────────────────────────────┐
│ Index ┆ A ┆ B │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[f64] ┆ list[f64] │
╞═══════╪═════════════════════════════════╪════════════════════════════════════════╡
│ 0 ┆ [203.0, null, 345.0, 457.0] ┆ [346.0, null, null, 457.0, 45.0, 46.0] │
│ 0 ┆ [11.0, 22.0, 44.0, 890.0] ┆ [22.0, 33.0, 44.0, 88.0, 90.0, 100.0] │
│ 0 ┆ [null, null, 456.0, 44.0, 90.0] ┆ null │
│ 1 ┆ null ┆ [33.0, 456.0, 99.0, 10.0, 10.0, 11.0] │
│ 1 ┆ null ┆ null │
└───────┴─────────────────────────────────┴────────────────────────────────────────┘
我的代码(这是在极坐标中)到目前为止看起来像这样:
import polars as pl
def convertdf(input):
return pl.read_csv(input)
def applycast(df,col):
dfcast = df.explode(col).select('Index',pl.col(col)).with_columns(pl.col(col).is_not_null()).then(pl.col(col).cast(pl.Float64,strict=False))).group_by('Index').agg(pl.col(col)).sort('Index')
df = df.replace(col, dfcast[col])
return df
def datatransform(df):
df = df.select(df.columns[1:]).select(pl.col(pl.String).str.split(' '))
df = df.pipe(lambda df : reduce(lambda df,col: applycast(df,col), df.columns,df))
return df
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv']
df_files = list(map(convertdf, csvfiles)) #Time taken: 3mins
dftransformedfiles = [df.pipe(datatransform) for df in df_files] #Time taken: ~2K mins
基本上,如您所见,我使用列表理解来循环每个 csv 文件。有什么办法可以并行执行吗?
有没有办法一次性将“applycast()”函数应用于所有列?因为,目前我正在遍历每一列,在我看来,这就是它花费更长时间的原因。虽然我每列的内容有所不同,但数据类型是List[str],需要转换为List[f64]。
我尝试在应用 datatransform() 之前连接所有数据帧,但连接花费了更长的时间。我想过使用“Ray”API进行并行执行,但它不支持最新的Python3.11。如何减少计算时间,或者迭代多列或多数据帧/csv 列表的最佳方法是什么?
我认为
pipe
一般来说,UDF 肯定会很慢。这里不需要replace
。
list.eval
执行您正在寻找的操作,它将列中的每个列表视为自己的系列:
df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
df.with_columns(
pl.col(pl.String)
.str.split(" ")
.list.eval(pl.element().cast(pl.Int64, strict=False))
)
shape: (2, 3)
┌─────┬─────────────────┬─────────────────┐
│ x ┆ y ┆ z │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[i64] ┆ list[i64] │
╞═════╪═════════════════╪═════════════════╡
│ 0 ┆ [3, null, null] ┆ [1, null, 2] │
│ 1 ┆ null ┆ [null, null, 5] │
└─────┴─────────────────┴─────────────────┘
因此将
with_columns
操作包装到一个方法中,看看加速是多少:
def data_load_transform(filename):
return pd.read_csv(filename).with_columns(
pl.col(pl.String)
.str.split(" ")
.list.eval(pl.element().cast(pl.Int64, strict=False))
)
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv']
dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]
如果您需要进一步加速,最后的列表理解可以通过各种多处理方式并行完成。
看起来转变是:
(df.with_columns(
pl.col(pl.Utf8).str.strip("'").str.split(" ")
.cast(pl.List(pl.Float64))
)
shape: (5, 3)
┌──────────────────────┬────────────────────────┬───────────────────────┐
│ Index (Dtype: [i64]) ┆ A (Dtype: [str]) ┆ B (Dtype: [str]) │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[f64] ┆ list[f64] │
╞══════════════════════╪════════════════════════╪═══════════════════════╡
│ 0 ┆ [203.0, null, … 457.0] ┆ [346.0, null, … 46.0] │
│ 0 ┆ [11.0, 22.0, … 890.0] ┆ [22.0, 33.0, … 100.0] │
│ 0 ┆ [null, null, … 90.0] ┆ null │
│ 1 ┆ null ┆ [33.0, 456.0, … 11.0] │
│ 1 ┆ null ┆ null │
└──────────────────────┴────────────────────────┴───────────────────────┘
对于从多个 csv 文件创建数据框,您可以使用
.scan_csv
而不是 .read_csv
。
df = pl.concat(pl.scan_csv(file) for file in csvfiles)
或者,如果您的文件名遵循全局模式:
df = pl.scan_csv("input*.csv")
df = df.with_columns(
pl.col(pl.Utf8).str.strip("'").str.split(" ")
.cast(pl.List(pl.Float64))
)
当
.scan_csv
返回一个 LazyFrame 时,您可以使用 .collect()
来生成结果。