如何在Python中并行处理大量文件,同时保持顺序并优化内存使用?

问题描述 投票:0回答:1

我正在开发一个 Python 项目,需要并行处理一个非常大的文件(例如,数 GB 的 CSV 或日志文件)以加快处理速度。但是,我有三个具体要求,使这项任务具有挑战性:

顺序保留:输出必须严格保持与输入文件相同的行顺序。 内存效率:解决方案必须避免将整个文件加载到内存中(例如,逐行或分块读取)。 并发性:处理应利用并行性来有效地处理 CPU 密集型任务。 我目前的方法 我使用

concurrent.futures.ThreadPoolExecutor
来并行化处理,但遇到了以下问题:

虽然

executor.map
按正确的顺序产生结果,但它似乎效率低下,因为即使后面的任务更早完成,任务也必须等待较早的任务完成。 使用
file.readlines()
读取整个文件会消耗太多内存,尤其是对于多 GB 的文件。 这是我尝试过的示例:

import concurrent.futures

def process_line(line):
    # Simulate a CPU-bound operation
    return line.upper()

with open("large_file.txt", "r") as infile:
    lines = infile.readlines()

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list(executor.map(process_line, lines))

with open("output.txt", "w") as outfile:
    outfile.writelines(results)

虽然此代码适用于小文件,但由于内存限制和线程使用的潜在低效率,它无法用于较大的文件。

所需的解决方案 我正在寻找一个解决方案:

  • 并行处理线路以利用多个 CPU 核心或 线程。
  • 确保输出行的写入顺序与输入的顺序相同 文件。
  • 以节省内存的方式读取和处理文件(例如, 流式处理或基于块的处理)。

另外,我想了解:

  • ThreadPoolExecutor
    还是
    ProcessPoolExecutor
    更合适 对于这种情况,考虑到潜在的 CPU 限制性质 任务。

  • 缓冲结果并将结果写入输出文件的最佳实践 不消耗太多内存。

主要挑战*

  • 如何为每一行(或块)分配唯一标识符 在不引入大量开销的情况下维持秩序?

  • Python 中是否有现有的库或设计模式可以 简化这种大文件的并行处理?

任何解决此问题的见解、示例或最佳实践将不胜感激!

python multithreading parallel-processing file-handling memory-efficient
1个回答
0
投票

在不知道每项任务具体需要多长时间的情况下,没有人能真正回答使用

ThreadPoolExecutor
还是
ProcessPoolExecutor
会更快。您需要两者都尝试一下,然后基准测试每个方法所花费的时间才能找到更好的方法。

这段代码可以帮助您自己弄清楚这一点,它基于这个答案,但它使用队列来限制正在读取的行,因此如果处理速度很慢,您就不会冒险读取整个文件。

import concurrent.futures
import os
import queue
import threading
from io import IOBase
import time
from typing import Optional

def process_line(line: str):
    # Simulate some CPU-bound work on the line
    for i in range(int(1e6)):
        pass
    return line.upper()

def writer_task(out_file: IOBase, writer_queue: queue.Queue):
    while True:
        fut: Optional[concurrent.futures.Future] = writer_queue.get()
        if fut is None:
            break
        line = fut.result()
        out_file.write(line)
        print("line written")

# Wrap main script behavior in main function
def main():
    t1 = time.time()
    with open("large_file.txt") as infile, open("output.txt", "w") as outfile:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            writer_queue = queue.Queue(maxsize=os.cpu_count() * 2 + 10)
            writer = threading.Thread(target=writer_task, args=(outfile, writer_queue), daemon=True)
            writer.start()
            for line in infile:
                print("line read")
                writer_queue.put(executor.submit(process_line, line))
            writer_queue.put(None)
            writer.join()
    t2 = time.time()
    print(f"time taken = {t2-t1}")

# Invoke main function only when run as script, not when imported or invoked
# as part of spawn-based multiprocessing
if __name__ == '__main__':
    main()

您可以轻松地将

ThreadPoolExecutor
替换为
ProcessPoolExecutor
测量 哪个更好。您可能想要删除
print("line written")
及其对应项,因为它们仅用于说明目的。

© www.soinside.com 2019 - 2024. All rights reserved.