启动并发初始化的.futures.ProcessPoolExecutor吗?

问题描述 投票:6回答:2

我打算使用concurrent.futures.ProcessPoolExecutor并行执行功能。根据documentation,其executor对象只能接受map中的简单功能。我的实际情况是在执行“待并行化”功能之前进行初始化(数据加载)。我该如何安排?

“待并行化”函数在一次迭代中被多次调用。我不希望每次都重新初始化它。

换句话说,有一个init函数为该tbp函数产生一些输出。每个孩子都应该拥有自己的输出副本,因为该功能取决于该副本。

python python-3.x concurrency multiprocessing subprocess
2个回答
6
投票

听起来您正在寻找与initializer所需的initargs / multiprocessing.Pool选项相同的选项。当前,multiprocessing.Pool不存在该行为,尽管有concurrent.futures.ProcessPoolExecutor会添加该行为。

因此,您可以使用patch waiting for review(可能适合您的用例),等待该补丁被合并和发布(您可能需要等待一会儿:)),或推出自己的解决方案。事实证明,为带有multiprocessing.Pool的map编写包装函数并不难,但每个进程只调用一个包装函数即可。

initializer

输出:

from concurrent.futures import ProcessPoolExecutor
from functools import partial

inited = False
initresult = None

def initwrapper(initfunc, initargs, f, x):
    # This will be called in the child. inited
    # Will be False the first time its called, but then
    # remain True every other time its called in a given
    # worker process.
    global inited, initresult
    if not inited:
        inited = True
        initresult = initfunc(*initargs)
    return f(x)

def do_init(a,b):
    print('ran init {} {}'.format(a,b))
    return os.getpid() # Just to demonstrate it will be unique per process

def f(x):
    print("Hey there {}".format(x))
    print('initresult is {}'.format(initresult))
    return x+1

def initmap(executor, initializer, initargs, f, it):
    return executor.map(partial(initwrapper, initializer, initargs, f), it)


if __name__ == "__main__":
    with ProcessPoolExecutor(4) as executor:
        out = initmap(executor, do_init, (5,6), f, range(10))
    print(list(out))

1
投票

ran init 5 6 Hey there 0 initresult is 4568 ran init 5 6 Hey there 1 initresult is 4569 ran init 5 6 Hey there 2 initresult is 4570 Hey there 3 initresult is 4569 Hey there 4 initresult is 4568 ran init 5 6 Hey there 5 initresult is 4571 Hey there 6 initresult is 4570 Hey there 7 initresult is 4569 Hey there 8 initresult is 4568 Hey there 9 initresult is 4570 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 开始,Python 3.7ThreadPoolExecutor都具有可选的ProcessPoolExecutorinitializer参数。每个线程/进程在启动后将调用initargs

请参见initializer(*initargs)

© www.soinside.com 2019 - 2024. All rights reserved.