我正在为一个对外部依赖关系非常挑剔的客户移植一个库。
该库中的大部分多处理由 pathos ProcessPool 模块支持。主要原因是它可以非常轻松地处理本地定义的函数。
我试图在不强制这种依赖性(或必须重写库的大块)的情况下恢复其中一些功能。我知道以下代码有效,因为该函数是在顶层定义的:
import multiprocessing as mp
def f(x):
return x * x
def main():
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
以下代码(这是我需要工作的代码)失败,因为该函数仅在本地范围内定义:
import multiprocessing as mp
def main():
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
在我的例子中(不是上面的玩具示例)我的生成器中的对象是不可编组的。
对于这个不需要外部依赖的特定用例,什么是一个好的解决方法?
主要问题是闭包变量。
如果你没有这些,可以这样做:
import marshal
import multiprocessing
import types
from functools import partial
def main():
def internal_func(c):
return c*c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))
def internal_func_map(pool, f, gen):
marshaled = marshal.dumps(f.__code__)
return pool.map(partial(run_func, marshaled=marshaled), gen)
def run_func(*args, **kwargs):
marshaled = kwargs.pop("marshaled")
func = marshal.loads(marshaled)
restored_f = types.FunctionType(func, globals())
return restored_f(*args, **kwargs)
if __name__ == "__main__":
main()
这个想法是,函数代码具有在新进程中运行它所需的一切。请注意,不需要外部依赖项,只需要常规的 python 库。
如果确实需要闭包,那么这个解决方案最困难的部分实际上是创建它们。 (在闭包中,有一个叫做“单元”的东西,它不太容易通过代码创建......)
这是有点复杂的工作代码:
import marshal
import multiprocessing
import pickle
import types
from functools import partial
class A:
def __init__(self, a):
self.a = a
def main():
x = A(1)
def internal_func(c):
return x.a + c
with multiprocessing.Pool(5) as pool:
print(internal_func_map(pool, internal_func, [i for i in range(10)]))
def internal_func_map(pool, f, gen):
closure = f.__closure__
marshaled_func = marshal.dumps(f.__code__)
pickled_closure = pickle.dumps(tuple(x.cell_contents for x in closure))
return pool.map(partial(run_func, marshaled_func=marshaled_func, pickled_closure=pickled_closure), gen)
def run_func(*args, **kwargs):
marshaled_func = kwargs.pop("marshaled_func")
func = marshal.loads(marshaled_func)
pickled_closure = kwargs.pop("pickled_closure")
closure = pickle.loads(pickled_closure)
restored_f = types.FunctionType(func, globals(), closure=create_closure(func, closure))
return restored_f(*args, **kwargs)
def create_closure(func, original_closure):
indent = " " * 4
closure_vars_def = f"\n{indent}".join(f"{name}=None" for name in func.co_freevars)
closure_vars_ref = ",".join(func.co_freevars)
dynamic_closure = "create_dynamic_closure"
s = (f"""
def {dynamic_closure}():
{closure_vars_def}
def internal():
{closure_vars_ref}
return internal.__closure__
""")
exec(s)
created_closure = locals()[dynamic_closure]()
for closure_var, value in zip(created_closure, original_closure):
closure_var.cell_contents = value
return created_closure
if __name__ == "__main__":
main()
希望对您有所帮助或至少为您提供一些如何解决此问题的想法!
原答案
免责声明:如果您想在本地定义函数以实现更好的代码管理,但可以接受其全局范围,则此答案适用
您可以在定义函数之前使用 global 关键字。它将解决腌制函数的问题(因为它现在是全局函数),同时在本地范围内定义它。
import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
print(f(4)) #Inner function is available here as well.
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
16
添加另一个具有多个同名函数的示例,每个后续函数都会覆盖前一个函数。
import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
def main2():
global f
def f(x):
return x * x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
if __name__ == "__main__":
main()
main2()
print(f(4))
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
64
更新答案
调用map后撤销全局状态。感谢@KCQs 在评论中的提示。
为了确保全局函数不会对其余代码造成任何问题,您只需为全局函数添加 del 语句即可撤销其全局状态。
import multiprocessing as mp
def main():
global f
def f(x):
return x * x
with mp.Pool(5) as p:
print(p.map(f, [i for i in range(10)]))
del f
if __name__ == "__main__":
main()
print(f(4)) #Inner function is not available.
输出:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Traceback (most recent call last):
File "<file>.py", line 25, in <module>
print(f(4))
NameError: name 'f' is not defined
虽然Python会自动收集垃圾,但你也可以手动调用垃圾收集器。