提高运行大文件的性能-Python

问题描述 投票:0回答:1

我知道有关此主题的一些问题,但我似乎无法使其有效进行。我的机器上运行着很大的输入数据集(2-3 GB),其中包含8GB的内存。我正在使用安装了spyderpandas 0.24.0版本。当前输入文件大约需要一个小时才能生成大约10MB的输出文件。

我一直试图通过使用下面的代码对输入文件进行分块来优化过程。本质上,我将输入文件chunk分成较小的段,通过一些代码运行它并导出较小的输出。然后,我删除分块的信息以释放内存。但是内存仍然会在整个操作过程中建立起来,并最终花费相似的时间。我不确定自己在做什么错:

文件的内存使用情况详细信息是:

RangeIndex: 5471998 entries, 0 to 5471997
Data columns (total 17 columns):
col1                     object
col2                     object
col3                     object
....
dtypes: object(17)
memory usage: 5.6 GB

我通过将cols_to_keep传递到use_cols来对df进行子集化。但是每个文件的标题都不同,因此我使用位置索引来获取相关的标题。

# Because the column headers change from file to file I use location indexing to read the col headers I need
df_cols = pd.read_csv('file.csv')
# Read cols to be used
df_cols = df_cols.iloc[:,np.r_[1,3,8,12,23]]
# Export col headers
cols_to_keep = df_cols.columns

PATH = '/Volume/Folder/Event/file.csv'
chunksize = 10000

df_list = [] # list to hold the batch dataframe

for df_chunk in pd.read_csv(PATH, chunksize = chunksize, usecols = cols_to_keep):
    # Measure time taken to execute each batch
    print("summation download chunk time: " , time.clock()-t)

    # Execute func1
    df1 = func1(df_chunk)

    # Execute func2
    df2 = func1(df1)

    # Append the chunk to list and merge all
    df_list.append(df2) 

# Merge all dataframes into one dataframe
df = pd.concat(df_list)

# Delete the dataframe list to release memory
del df_list
del df_chunk

我尝试使用dask,但使用简单的pandas方法会遇到各种错误。

import dask.dataframe as ddf

df_cols = pd.read_csv('file.csv')
df_cols = df_cols.iloc[:,np.r_[1:3,8,12,23:25,32,42,44,46,65:67,-5:0,]]
cols_to_keep = df_cols.columns

PATH = '/Volume/Folder/Event/file.csv'
blocksize = 10000


df_list = [] # list to hold the batch dataframe


df_chunk = ddf.read_csv(PATH, blocksize = blocksize, usecols = cols_to_keep, parse_dates = ['Time']):
    print("summation download chunk time: " , time.clock()-t)

    # Execute func1
    df1 = func1(df_chunk)

    # Execute func2
    df2 = func1(df1)

    # Append the chunk to list and merge all
    df_list.append(df2)


    delayed_results = [delayed(df2) for df_chunk in df_list]

抛出错误的行:

df1 = func1(df_chunk)

  name_unq = df['name'].dropna().unique().tolist()

AttributeError: 'Series' object has no attribute 'tolist'

我已经通过了许多功能,但它仍然会继续引发错误。

python pandas memory chunked-encoding
1个回答
0
投票

要处理文件,请使用dask,它仅用于处理包含大文件(实际上是very个大文件)。

它还具有read_csv函数,带有附加的[[blocksize参数,用于定义单个块的大小。

read_csv

的结果在概念上是单个(dask)DataFrame,由partitions序列组成,实际上是pandasonic DataFrames。然后您可以使用

map_partitions

函数来应用您的函数到每个分区。因为此函数(传递给map_partitions)在单个分区(pandasonic DataFrame),则可以使用任何代码先前在Pandas环境中测试过。此解决方案的优点是处理单个分区在可用内核之间分配,而

Pandas

仅使用单个内核。因此您的循环应重新设计为:

  • read_csv
dask版本),
  • map_partitions
  • 从每个分区生成部分结果,
  • concat
  • -连接这些部分结果(目前为结果仍然是dask DataFrame,将其转换为
  • pandasonic
  • DataFrame,
  • Pandas
  • 环境中进一步使用它。要获取更多详细信息,请阅读有关

    dask

    © www.soinside.com 2019 - 2024. All rights reserved.