我正在开发一个 Python 项目,需要并行处理一个非常大的文件(例如,数 GB 的 CSV 或日志文件)以加快处理速度。但是,我有三个具体要求,使这项任务具有挑战性:
顺序保留:输出必须严格保持与输入文件相同的行顺序。 内存效率:解决方案必须避免将整个文件加载到内存中(例如,逐行或分块读取)。 并发性:处理应利用并行性来有效地处理 CPU 密集型任务。 我目前的方法 我使用
concurrent.futures.ThreadPoolExecutor
来并行化处理,但遇到了以下问题:
虽然
executor.map
按正确的顺序产生结果,但它似乎效率低下,因为即使后面的任务更早完成,任务也必须等待较早的任务完成。
使用 file.readlines()
读取整个文件会消耗太多内存,尤其是对于多 GB 的文件。
这是我尝试过的示例:
import concurrent.futures
def process_line(line):
# Simulate a CPU-bound operation
return line.upper()
with open("large_file.txt", "r") as infile:
lines = infile.readlines()
with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(process_line, lines))
with open("output.txt", "w") as outfile:
outfile.writelines(results)
虽然此代码适用于小文件,但由于内存限制和线程使用的潜在低效率,它无法用于较大的文件。
所需的解决方案 我正在寻找一个解决方案:
另外,我想了解:
ThreadPoolExecutor
还是ProcessPoolExecutor
更合适
对于这种情况,考虑到潜在的 CPU 限制性质
任务。
缓冲结果并将结果写入输出文件的最佳实践 不消耗太多内存。
主要挑战*
如何为每一行(或块)分配唯一标识符 在不引入大量开销的情况下维持秩序?
Python 中是否有现有的库或设计模式可以 简化这种大文件的并行处理?
任何解决此问题的见解、示例或最佳实践将不胜感激!
在不知道每项任务具体需要多长时间的情况下,没有人能真正回答使用
ThreadPoolExecutor
还是 ProcessPoolExecutor
会更快。您需要两者都尝试一下,然后基准测试每个方法所花费的时间才能找到更好的方法。
这段代码可以帮助您自己弄清楚这一点,它基于这个答案,但它使用队列来限制正在读取的行,因此如果处理速度很慢,您就不会冒险读取整个文件。
import concurrent.futures
import os
import queue
import threading
from io import IOBase
import time
from typing import Optional
def process_line(line: str):
# Simulate some CPU-bound work on the line
for i in range(int(1e6)):
pass
return line.upper()
def writer_task(out_file: IOBase, writer_queue: queue.Queue):
while True:
fut: Optional[concurrent.futures.Future] = writer_queue.get()
if fut is None:
break
line = fut.result()
out_file.write(line)
print("line written")
# Wrap main script behavior in main function
def main():
t1 = time.time()
with open("large_file.txt") as infile, open("output.txt", "w") as outfile:
with concurrent.futures.ThreadPoolExecutor() as executor:
writer_queue = queue.Queue(maxsize=os.cpu_count() * 2 + 10)
writer = threading.Thread(target=writer_task, args=(outfile, writer_queue), daemon=True)
writer.start()
for line in infile:
print("line read")
writer_queue.put(executor.submit(process_line, line))
writer_queue.put(None)
writer.join()
t2 = time.time()
print(f"time taken = {t2-t1}")
# Invoke main function only when run as script, not when imported or invoked
# as part of spawn-based multiprocessing
if __name__ == '__main__':
main()
您可以轻松地将
ThreadPoolExecutor
替换为 ProcessPoolExecutor
并 测量 哪个更好。您可能想要删除 print("line written")
及其对应项,因为它们仅用于说明目的。