我有不同的Python类,它们都有一个名为push的方法。我正在努力的是
我尝试了多种方法使用
#from pathos.multiprocessing import ProcessingPool as Pool
from multiprocessing import Pool
def push_wrapper(obj):
obj.push()
class_objects=[class1_obj1,class2_obj2,class3_obj3]
with Pool(processes=max_thread_count) as pool:
pool.map(push_wrapper, class_objects)
pool.close()
pool.join()
出现此错误
raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
还有一些其他方法,例如使用
pool.apply_async
,但它们并不等待所有方法立即完成退出。当我添加 job.wait()
和 pool.apply_async
时,它等待所有线程完成,但我想在线程完成时打印线程的结果
让我知道解决此用例的最佳方法是什么
您想要传递类的工作人员实例,然后让他们调用每个实例的
push
方法。类不可序列化(Python 内部使用 stdlib 的 pickle
来实现),并且不能作为参数传递给工作人员。
另一方面,常规类的实例可以传递(请注意,由本机代码中的扩展创建的特殊类可能没有 pickle 支持 - 但可以实现 - 检查 pickle 协议文档) - 然后,所有工作人员需要做的就是调用每个方法的
push
方法。
TL;DR:你应该让它工作改变这一行:
pool.map(push_wrapper, (cls().push for cls in class_objects))
...
请注意,表达式
(cls().push for cls in class_objects)
是一个生成器表达式 - 因此是 pool.map
可接受的迭代 - 并且对于类序列中的每个元素,创建一个实例 (cls()
),并将其 .push
方法引用为可在工作线程中运行的目标。
在底层,子处理(或任何嵌入式库)将序列化刚刚创建的实例(如果您需要在根进程中引用实例,请根据需要修改代码),以及对其的引用
push
方法。