在python中的for循环中启动多进程池

问题描述 投票:-1回答:2

所以我编写了一个收集目录中文件列表的函数。将它们切成4的大小并使用for循环馈送到多进程池。以下是clairity的代码。

def Main():

    allft_files = listdir(path_allft)
    ncores = cpu_count()

    start = datetime.datetime.now()

    for i in range(0, len(allft_files ), ncores):

        chuncks = [allft_files[x:x + 1] for x in range(i, i+4, 1)]
        pool_processes = Pool(processes=ncores)
        pool_processes.map(filter_allft, chuncks)

我的问题是在下一次迭代开始之前是否所有进程都已完成并加入。或者,当其中一个进程完成时,脚本会跳转到for循环中的下一个迭代。因为每个过程的处理时间略有不同。我不确定游泳池在这方面是如何运作的。

python multithreading multiprocessing
2个回答
0
投票

Pool.map(强调我的)的文档:

map()内置函数的并行等价物(它只支持一个可迭代的参数)。它会阻塞,直到结果准备就绪。

在每个元素准备好之前,结果都无法准备好。只有在流程池中的工作完成后,每个元素才会准备就绪。因此,循环的最后一行:

pool_processes.map(filter_allft, chuncks)

直到filter_allft被应用于chuncks的每个元素才会完成。只有在发生这种情况后才会执行循环的下一次迭代。

但是,您的示例中没有明确清理进程池的代码(终止其进程或加入它们)。因此,它们只会在垃圾收集器收集池时终止。当新池替换pool_processes变量中的旧池时,很可能会在循环的下一次迭代中发生这种情况。

因此,虽然在下一次迭代开始之前所有工作都已完成,但在下一次迭代开始之前,这些过程将不会被清除。如果池由引用计数系统清理,则至少在下一次迭代创建更多进程之前终止其进程。如果您必须依赖循环检测器,则可能会同时拥有多个活动池。

为避免这种情况,您可以添加一个明确的:

pool_processes.terminate()
pool_processes.join()

到循环结束。


1
投票

multiprocessing文档让我发疯。池工作并返回结果。 map函数会扇出一系列任务并等待所有任务完成,以便它可以汇编并返回所有结果。你认为文档会提到这一点!因此,正如您所怀疑的那样,在开始下一个数据集之前,map必须完全完成。

您的代码有一个错误 - 您在每次迭代中设置一个新池,它只是放弃旧池并且不必要地昂贵。至少,将池创建移出for

听起来你会从使用其中一个异步调用中受益。 mapapply调用会立即返回一个ApplyResult对象,您可以使用该对象等待结果。以下是您关心结果的几个示例。

import multiprocessing as mp
import time
import random
import contextlib

def worker(i,j):
    time.sleep(random.random())
    print('done', i, j)

if __name__ == "__main__":

    # The Pool context manager terminates the pool (killing workers)
    # but we just want to close (letting workers finish) and join.
    with contextlib.closing(mp.Pool(8)) as pool:
        for i in range(5):
            for j in range(20):
                pool.apply_async(worker, args=(i,j))
    pool.join()

    print("\nIf you want to process the results\n")
    results = []

    with contextlib.closing(mp.Pool(8)) as pool:
        for i in range(5):
            for j in range(20):
                results.append(pool.apply_async(worker, args=(i,j)))
    for result in results:
        result.get()
    pool.join()
© www.soinside.com 2019 - 2024. All rights reserved.