单线程文件加载和多处理

问题描述 投票:0回答:1

我有一个大数据文件(几 GB 到几十 GB),我想在 python 中以多个线程读取和处理该文件。

我目前的方法是分部分读取文件(假设为 100MB)并将该信息传递给线程,如下所示:

from multiprocessing import Pool

with Pool(processes=8) as pool:

    file_chunk = readFileInJunks() #iterator

    for i in range(8):
        pool.apply_async(f, (next(file_chunk),))

    pool.close()
    pool.join()

这都是有趣的游戏,直到文件比我的内存大。所以我的问题是:如何避免遇到内存限制。

我可以让每个进程自行加载其数据块,但这意味着 n 个进程将同时访问同一个文件,可能会减慢读取速度(尤其是对于较旧的磁盘)。另一种选择是在插槽空闲时手动调用 apply_async,这有点违背了线程池的全部意义。

有更优雅的方法来处理这个问题吗?

python multiprocessing python-multiprocessing
1个回答
0
投票

您需要做的是将文件分解成更小的块,以便您的 8 个池进程有足够的内存一次处理 8 个块。但是您向池提交块的速度可能比池处理块的速度更快。这将导致池的任务队列填满这些块,等待空闲池进程对其进行处理。因此,我们需要一种机制来限制块的读取和提交到池中。选择一个块大小可能很有用,这样可以同时处理 8 个块,同时还有 8 个额外的块位于任务队列上等待处理,这样池进程完成块上的工作和能够执行任务之间就没有延迟。开始处理新块。这意味着您应该选择一个块大小,以便内存可以容纳 16 个块。

但是我们如何限制向池中提交任务(块),以便内存中的块永远不会超过 16 个?通过使用正确初始化的信号量。在下面的代码中,主进程将能够立即向池中提交 16 个块进行处理,但将被阻止提交下一个块,直到池进程完成处理先前提交的块:

from multiprocessing import Pool
from threading import Semaphore

NUM_PROCESSES = 8  # The pool size

# So that when a pool process completes work on a chunk, there is
# always at least another chunk immediately available in the task queue
# to work on:
SUBMITTED_CHUNKS_COUNT = 2 * NUM_PROCESSES

# Chose a chunksize so that SUBMITTED_CHUNKS_COUNT * CHUNK_SIZE fits in memory:
CHUNK_SIZE = 10  # For demo purposes (not a realistic vaue)

def read_file_in_chunks():  # Use name that conforms to PEP8 specification
    # For demo purposes just return the same string repeatedly:
    for _ in range(30):
        yield "abcdefghij"  # string of length CHUNK_SIZE

def f(chunk):
    ...  # Do something with chunk
    print(chunk)

def main():
    semaphore = Semaphore(SUBMITTED_CHUNKS_COUNT)

    def task_completed(result):
        """Called when a chunk has finished being processed."""

        # For now we do not care about the result
        semaphore.release()  # Allow a new chunk to be submitted

    with Pool(processes=NUM_PROCESSES) as pool:
        for chunk in read_file_in_chunks():
            semaphore.acquire()  # Throttle submissions
            pool.apply_async(f, args=(chunk,), callback=task_completed)

        pool.close()
        pool.join()

if __name__ == '__main__':
    main()

如果您更喜欢较大的块,并且愿意接受进程在能够处理新块之前完成对块的处理时会发生的轻微延迟,则在上面的代码中设置

SUBMITTED_CHUNKS_COUNT = NUM_PROCESSES
,然后您可以使用块大小是原来的两倍。

© www.soinside.com 2019 - 2024. All rights reserved.