我正在构建一个简单的管道来从数据库中提取数据集并将其写入 csv,以便我将来可以更快地访问数据。
目前,我有这个:
# data loading as a pipeline
# ingests and writes batches of CHUNK_SIZE
CHUNK_SIZE = 10_000
data_loader = pd.read_sql(sql=SQL, con=conn, chunksize=CHUNK_SIZE)
for chunk in tqdm(data_loader, total=chunks) :
chunk.to_csv("data/raw/statanom.pit_train.csv", mode="a")
我发现有一个瓶颈,因为它在顺序工作时可以开始提取下一个数据块,同时将前一个数据块写入磁盘。
假设这是可能的并且是推荐的,我如何并行化这些任务?我想在将前一个块写入磁盘的同时启动下一次迭代的查询。
您可以使用 python 中的
concurrent.futures
模块来使用多重处理。
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor, as_completed
CHUNK_SIZE = 10_000
# adding a function to simplify
def write_chunk(chunk, filename):
chunk.to_csv(filename, mode='a', header=False, index=False)
conn = ...
SQL = ...
chunks = ...
data_loader = pd.read_sql(sql=SQL, con=conn, chunksize=CHUNK_SIZE)
output_file = "data/foo.csv"
with ThreadPoolExecutor(max_workers=2) as executor:
future_to_chunk = {}
for chunk in tqdm(data_loader, total=chunks):
future = executor.submit(write_chunk, chunk, output_file)
future_to_chunk[future] = chunk
for future in as_completed(future_to_chunk):
future.result()
希望这有帮助!