使用本地函数进行多处理的解决方法

问题描述 投票:0回答:2

我正在为一个对外部依赖关系非常挑剔的客户移植一个库。

该库中的大部分多处理由 pathos ProcessPool 模块支持。主要原因是它可以非常轻松地处理本地定义的函数。

我试图在不强制这种依赖性(或必须重写库的大块)的情况下恢复其中一些功能。我知道以下代码有效,因为该函数是在顶层定义的:

import multiprocessing as mp


def f(x):
    return x * x


def main():
    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))


if __name__ == "__main__":
    main()

以下代码(这是我需要工作的代码)失败,因为该函数仅在本地范围内定义:

import multiprocessing as mp


def main():
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))


if __name__ == "__main__":
    main()

在我的例子中(不是上面的玩具示例)我的生成器中的对象是不可编组的。

对于这个不需要外部依赖的特定用例,什么是一个好的解决方法?

  • 有一个使用 fork 的解决方法,但这对于 Mac 和 Windows 来说是不安全的(感谢 @Monica 和 @user2357112)。
  • @amsh 提供了一种似乎适用于任何函数+生成器的解决方法。虽然这是一个很好的选择,但缺点是它需要在全局范围内定义函数。
python multiprocessing
2个回答
3
投票

主要问题是闭包变量。

如果你没有这些,可以这样做:

import marshal
import multiprocessing
import types
from functools import partial


def main():
    def internal_func(c):
        return c*c

    with multiprocessing.Pool(5) as pool:
        print(internal_func_map(pool, internal_func, [i for i in range(10)]))


def internal_func_map(pool, f, gen):
    marshaled = marshal.dumps(f.__code__)
    return pool.map(partial(run_func, marshaled=marshaled), gen)


def run_func(*args, **kwargs):
    marshaled = kwargs.pop("marshaled")
    func = marshal.loads(marshaled)

    restored_f = types.FunctionType(func, globals())
    return restored_f(*args, **kwargs)


if __name__ == "__main__":
    main()

这个想法是,函数代码具有在新进程中运行它所需的一切。请注意,不需要外部依赖项,只需要常规的 python 库。

如果确实需要闭包,那么这个解决方案最困难的部分实际上是创建它们。 (在闭包中,有一个叫做“单元”的东西,它不太容易通过代码创建......)

这是有点复杂的工作代码:

import marshal
import multiprocessing
import pickle
import types
from functools import partial


class A:
    def __init__(self, a):
        self.a = a


def main():
    x = A(1)

    def internal_func(c):
        return x.a + c

    with multiprocessing.Pool(5) as pool:
        print(internal_func_map(pool, internal_func, [i for i in range(10)]))


def internal_func_map(pool, f, gen):
    closure = f.__closure__
    marshaled_func = marshal.dumps(f.__code__)
    pickled_closure = pickle.dumps(tuple(x.cell_contents for x in closure))
    return pool.map(partial(run_func, marshaled_func=marshaled_func, pickled_closure=pickled_closure), gen)


def run_func(*args, **kwargs):
    marshaled_func = kwargs.pop("marshaled_func")
    func = marshal.loads(marshaled_func)
    pickled_closure = kwargs.pop("pickled_closure")
    closure = pickle.loads(pickled_closure)

    restored_f = types.FunctionType(func, globals(), closure=create_closure(func, closure))
    return restored_f(*args, **kwargs)


def create_closure(func, original_closure):
    indent = " " * 4
    closure_vars_def = f"\n{indent}".join(f"{name}=None" for name in func.co_freevars)
    closure_vars_ref = ",".join(func.co_freevars)
    dynamic_closure = "create_dynamic_closure"
    s = (f"""
def {dynamic_closure}():
    {closure_vars_def}
    def internal():
        {closure_vars_ref}
    return internal.__closure__
""")
    exec(s)
    created_closure = locals()[dynamic_closure]()
    for closure_var, value in zip(created_closure, original_closure):
        closure_var.cell_contents = value
    return created_closure


if __name__ == "__main__":
    main()

希望对您有所帮助或至少为您提供一些如何解决此问题的想法!


1
投票

原答案

免责声明:如果您想在本地定义函数以实现更好的代码管理,但可以接受其全局范围,则此答案适用

您可以在定义函数之前使用 global 关键字。它将解决腌制函数的问题(因为它现在是全局函数),同时在本地范围内定义它。

import multiprocessing as mp

def main():
    global f
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))

if __name__ == "__main__":
    main()
    print(f(4)) #Inner function is available here as well.

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
16

添加另一个具有多个同名函数的示例,每个后续函数都会覆盖前一个函数。

import multiprocessing as mp

def main():
    global f
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))

def main2():
    global f
    def f(x):
        return x * x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))

if __name__ == "__main__":
    main()
    main2()
    print(f(4))

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
64

更新答案

调用map后撤销全局状态。感谢@KCQs 在评论中的提示。

为了确保全局函数不会对其余代码造成任何问题,您只需为全局函数添加 del 语句即可撤销其全局状态。

import multiprocessing as mp

def main():
    global f
    def f(x):
        return x * x

    with mp.Pool(5) as p:
        print(p.map(f, [i for i in range(10)]))
    del f

if __name__ == "__main__":
    main()
    print(f(4)) #Inner function is not available.

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Traceback (most recent call last):
  File "<file>.py", line 25, in <module>
    print(f(4))
NameError: name 'f' is not defined

虽然Python会自动收集垃圾,但你也可以手动调用垃圾收集器

© www.soinside.com 2019 - 2024. All rights reserved.