pandas 中的读/写管道

问题描述 投票:0回答:1

我正在构建一个简单的管道来从数据库中提取数据集并将其写入 csv,以便我将来可以更快地访问数据。

目前,我有这个:

# data loading as a pipeline
# ingests and writes batches of CHUNK_SIZE
CHUNK_SIZE = 10_000
data_loader = pd.read_sql(sql=SQL, con=conn, chunksize=CHUNK_SIZE)
for chunk in tqdm(data_loader, total=chunks) :
    chunk.to_csv("data/raw/statanom.pit_train.csv", mode="a")

我发现有一个瓶颈,因为它在顺序工作时可以开始提取下一个数据块,同时将前一个数据块写入磁盘。

假设这是可能的并且是推荐的,我如何并行化这些任务?我想在将前一个块写入磁盘的同时启动下一次迭代的查询。

python pandas parallel-processing file-io export-to-csv
1个回答
0
投票

您可以使用 python 中的

concurrent.futures
模块来使用多重处理。

import pandas as pd
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
CHUNK_SIZE = 10_000

# adding a function to simplify
def write_chunk(chunk, filename):
    chunk.to_csv(filename, mode='a', header=False, index=False)

conn = ...  
SQL = ...  
chunks = ...  

data_loader = pd.read_sql(sql=SQL, con=conn, chunksize=CHUNK_SIZE)

output_file = "data/foo.csv"

with ThreadPoolExecutor(max_workers=2) as executor:
    future_to_chunk = {}
    for chunk in tqdm(data_loader, total=chunks):
        future = executor.submit(write_chunk, chunk, output_file)
        future_to_chunk[future] = chunk
    for future in as_completed(future_to_chunk):
        future.result()

希望这有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.