此代码可以运行
我得到一个空数组“链接”
虽然我应该得到一个包含对象的数组, 当我在 func_b 中打印链接时,它不为空
Self 应该引用对象,但我猜这不是多处理的情况,它似乎是某种副本
有谁知道解决办法吗?
谢谢
class class_b:
def __init__(self):
self.counter=0
self.sum=[]
class class_a:
import random as ran
from multiprocessing import Pool
import os
def __init__(self):
self.link=[0]*50
self.arr_a=[0]*100
def func_b(self,arg):
arr_key=self.ran.randint(0, 50)
if self.link[arr_key] ==0:
self.link[arr_key]=class_b()
else:
obj=self.link[arr_key]
obj.counter+=1
(obj.sum).append(self.ran.randint(0,10))
print(self.link)
def func_a(self):
with self.Pool(processes=self.os.cpu_count()) as pool:
pool.map(self.func_b,enumerate(self.arr_a))
print(self.link)
def task():
obj=class_a()
obj.func_a()
if __name__ == '__main__':
task()
使用多处理时,对象不会在主进程和工作进程之间共享 - 相反,每个参数作为参数传递给将在工作进程中运行的函数(例如,通过
Poll.map
调用),被序列化 - 即转换为字节流表示形式,并在工作进程中反序列化。如果简单地使用 multiprocessing.Process(target=...).start()
方法,甚至被调用函数的返回值也会被忽略。
但是,当使用
Pool.map
时,Python 会对其进行安排,以便目标工作线程上的 return 值被序列化并以透明的方式发送回原始进程。
因此,如果您只是更改子进程内方法以返回其计算结果,而不是尝试就地访问这些结果,您的代码就可以工作。
但是,对于这个特定的工作负载,您希望每次调用远程函数时,它都会尝试处理数据已经存在并由对等进程创建:
self.link
列表将独立存在于每个进程中 - 并且如果在worker 1中创建了一个密钥,它不会存在于worker[2]中 - 并且将创建一个新的class_b
实例。
对于不可避免的情况,Python 的 stdlib 提供了 “Manager” 类 - 这将允许列表代理对象的行为就像在每个工作人员之间共享一样(实际上,当工作人员访问列表,它是隐式序列化的 - 在该对象中反序列化 - 在您的情况下,如果worker
1将其放入列表中,则将在worker[2]中创建
class_b
的新实例,并且存在复杂性除非人们非常清楚他们在做什么(例如,必须使用检索到的 class_b 实例更新回相同的列表项,否则经理将无法知道它已在 worker[2]
中发生更改。另一个例子:会有竞争条件:如果worker[2]和worker[3]同时检索一个class_b实例,则最后更新它的人“获胜” - 解决这个问题的方法是使用锁 -代码变得更加复杂)。
体验
multiprocessing.Manager
并进行一些试验和错误以了解其工作方式对您来说是有好处的 - 特别是如果此代码代表您将拥有的真实用例。然而,在这个答案中,我并没有探索这条道路,而是选择了一个更简单的选项,不太容易出现复杂的边缘情况。
总而言之,使用并发时的理想方法是尝试构建算法,以便每个工作人员都可以以独立的方式完成其工作 - 对于您的工作负载,在主进程中合并多个
class_b
实例可能是最简单的。
请注意,尽管合并所有结果的工作可能并不简单(并且您的示例代码恰好有一些必须考虑的特殊性),但这种方法仍然比使用基于“管理器”的方法更安全。
class class_b:
def __init__(self):
self.counter=0
self.sum=[]
@classmethod
def consolidate(cls, instances: "iterable[class_b]"):
self = cls() # creates an empty "class_b"
dropped_first = False
for instance in instances:
# to preserve the behavior of your original code,
# of an instance not getting a new number in "sum"
# when it is first created, I will frop a "counter"
# number, and the first number for the first processed instance
if not dropped_first:
instance.counter -= 1
instance.sum.pop(0)
dropped_first = True
self.counter += instance.counter
self.sum.extend(instance.sum)
return self
class class_a:
import random as ran
from multiprocessing import Pool
import os
def __init__(self):
self.link=[0]*50
self.arr_a=[0]*100
def func_b(self,arg):
arr_key=self.ran.randint(0, 50)
if self.link[arr_key] ==0:
self.link[arr_key]=class_b()
# here I modify your code: in the original the first
# time an item is reached, it doesn't get a new "sum" number:
# I create one each time, and this first
# number is discarded when consolidating the results
obj=self.link[arr_key]
obj.counter+=1
obj.sum.append(self.ran.randint(0,10))
print(self.link)
return self.link # return the list local for this process.
def func_a(self):
with self.Pool(processes=self.os.cpu_count()) as pool:
all_results = pool.map(self.func_b,enumerate(self.arr_a))
new_links = []
for items_at_index in zip(all_results):
# in each interation "item_at_index" will have a slice
# of all returned "link" lists for that index (that is what `zip` does)
items_with_results = [item for item in items_at_index if item is not 0]
if items_with_results:
new_links.append(class_b.consolidate(items_with_results))
else:
new_links.append(0)
self.link = new_links
print(self.link)
def task():
obj=class_a()
obj.func_a()
if __name__ == '__main__':
task()