对非常大的数据帧进行多处理的最有效方法是什么?

问题描述 投票:0回答:1

我有一个大型数据框,我需要对其进行大量匹配操作,并且过去一直使用以下方法来执行此操作。然而,我当前尝试多重处理的 Dataframe 是一个 2 GB 的 CSV 文件,我的计算机在多重处理时遇到问题,即使只有一个分区也是如此。我假设这是因为当数据帧被分成块进行多处理时,所需的内存量增加了一倍,因此我的计算机无法处理它。这是我当前的代码:

def parallelize_dataframe(df, func, additional_param, num_partitions):
    df_split = np.array_split(df, num_partitions)
    results = []
    with ProcessPoolExecutor(max_workers=num_partitions) as executor:
        futures = {executor.submit(func, chunk, additional_param): chunk for chunk in df_split}
        for future in tqdm(futures, total=len(futures), desc="Overall progress"):
            results.append(future.result())
    return pd.concat(results)

非常感谢任何帮助。

python pandas multiprocessing
1个回答
0
投票

对于这样的任务,我建议预处理 csv 文件,将其分成大致相等的块,这些块由子进程读取,而不是由主进程读取并发送到子进程。将数据从主设备发送到子设备需要相当多的开销(和内存)。这是一个例子:

from multiprocessing import Pool
from io import BytesIO
import pandas as pd

csvfile = "c:\some\example\data.csv"

chunksize = 2**20  # 1MiB chunk size (try different values depending on file size and processing speed)

#example csv contents
# colA,colB,colC
# 1,2,3
# 4,5,6
# 7,8,9
# ...

def sum_cols(args):
    file_start, file_end, col_names = *args  # unpack tuple args as Pool.imap_unordered only supports a single arg
    with open(csvfile, "rb") as f:
        f.seek(file_start)
        buf = BytesIO(f.read(file_end-file_start))  # store chunk of csv in a buffer to pass to pandas
    df = pd.read_csv(buf, names=col_names)  # col_names aren't in the chunk so pass them explicitly
    return df.sum()
        
if __name__ == "__main__":
    with open(csvfile, "rb") as f:
        firstline = f.readline()
        headers = [col_title.strip() for col_title in firstline.split(",")]
        startpoints = []
        endpoints = []
        while True:  # scan the file without reading in much data to find good chunk boundaries (start and end on newline)
            startpoints.append(f.tell())
            f.seek(chunksize,1)  # skip ahead by chunksize bytes
            line = f.readline()  # find the end of the current line
            endpoints.append(f.tell())
            if not line: break  # empty line indicates end of the file
    
    arglist = [(start, end, headers) for start, end in zip(startpoints, endpoints)]
    with Pool() as pool:
        print(sum(pool.imap(sum_cols, arglist)))

我没有使用过

polars
,无法据此做出答案,但我的理解是它非常快。

© www.soinside.com 2019 - 2024. All rights reserved.