带有类对象的Python多处理池

问题描述 投票:0回答:1

此代码可以运行

我得到一个空数组“链接”

虽然我应该得到一个包含对象的数组, 当我在 func_b 中打印链接时,它不为空

Self 应该引用对象,但我猜这不是多处理的情况,它似乎是某种副本

有谁知道解决办法吗?

谢谢

class class_b:
     
     def __init__(self):
          
        self.counter=0
        self.sum=[]

class class_a:

    import random as ran
    from multiprocessing import Pool
    import os

    def __init__(self):

        self.link=[0]*50
        self.arr_a=[0]*100

    def func_b(self,arg):
        
        arr_key=self.ran.randint(0, 50)

        if self.link[arr_key] ==0:

            self.link[arr_key]=class_b()

        else:

            obj=self.link[arr_key]
            obj.counter+=1
            (obj.sum).append(self.ran.randint(0,10))
        
        print(self.link)

        

    def func_a(self):
        
        with self.Pool(processes=self.os.cpu_count()) as pool:

                pool.map(self.func_b,enumerate(self.arr_a))

        print(self.link)

def task():

    obj=class_a()
    obj.func_a()

if __name__ == '__main__':
    task()
python class object multiprocessing multicore
1个回答
0
投票

使用多处理时,对象不会在主进程和工作进程之间共享 - 相反,每个参数作为参数传递给将在工作进程中运行的函数(例如,通过

Poll.map
调用),被序列化 - 即转换为字节流表示形式,并在工作进程中反序列化。如果简单地使用
multiprocessing.Process(target=...).start()
方法,甚至被调用函数的返回值也会被忽略。

但是,当使用

Pool.map
时,Python 会对其进行安排,以便目标工作线程上的 return 值被序列化并以透明的方式发送回原始进程。

因此,如果您只是更改子进程内方法以返回其计算结果,而不是尝试就地访问这些结果,您的代码就可以工作。

但是,对于这个特定的工作负载,您希望每次调用远程函数时,它都会尝试处理数据已经存在并由对等进程创建:

self.link
列表将独立存在于每个进程中 - 并且如果在worker 1中创建了一个密钥,它不会存在于worker[2]中 - 并且将创建一个新的
class_b
实例。

对于不可避免的情况,Python 的 stdlib 提供了 “Manager” 类 - 这将允许列表代理对象的行为就像在每个工作人员之间共享一样(实际上,当工作人员访问列表,它是隐式序列化的 - 在该对象中反序列化 - 在您的情况下,如果worker

1
将其放入列表中,则将在worker[2]中创建class_b的新实例,并且存在复杂性除非人们非常清楚他们在做什么(例如,必须使用检索到的 class_b 实例更新回相同的列表项,否则经理将无法知道它已在
worker[2]
中发生更改。另一个例子:会有竞争条件:如果worker[2]和worker[3]同时检索一个class_b实例,则最后更新它的人“获胜” - 解决这个问题的方法是使用锁 -代码变得更加复杂)。

体验

multiprocessing.Manager
并进行一些试验和错误以了解其工作方式对您来说是有好处的 - 特别是如果此代码代表您将拥有的真实用例。然而,在这个答案中,我并没有探索这条道路,而是选择了一个更简单的选项,不太容易出现复杂的边缘情况。

总而言之,使用并发时的理想方法是尝试构建算法,以便每个工作人员都可以以独立的方式完成其工作 - 对于您的工作负载,在主进程中合并多个

class_b
实例可能是最简单的。

请注意,尽管合并所有结果的工作可能并不简单(并且您的示例代码恰好有一些必须考虑的特殊性),但这种方法仍然比使用基于“管理器”的方法更安全。


class class_b:
     
     def __init__(self):
          
        self.counter=0
        self.sum=[]

@classmethod
def consolidate(cls, instances: "iterable[class_b]"):
    self = cls()  # creates an empty "class_b"
    dropped_first = False
    for instance in instances:
        #  to preserve the behavior of your original code,
        # of an instance not getting a new number in "sum"
        # when it is first created, I will frop a "counter" 
        # number, and the first number for the first processed instance
        if not dropped_first:
            instance.counter -= 1
            instance.sum.pop(0)
            dropped_first = True
        self.counter += instance.counter
        self.sum.extend(instance.sum)
    
    return self

class class_a:

    import random as ran
    from multiprocessing import Pool
    import os

    def __init__(self):

        self.link=[0]*50
        self.arr_a=[0]*100

    def func_b(self,arg):
        
        arr_key=self.ran.randint(0, 50)

        if self.link[arr_key] ==0:

            self.link[arr_key]=class_b()

        # here I modify your code: in the original the first 
        # time an item is reached, it doesn't get a new "sum" number:
        # I create one each time, and this first 
        # number is discarded when consolidating the results
        obj=self.link[arr_key]
        obj.counter+=1
        obj.sum.append(self.ran.randint(0,10))
    
        print(self.link)
        return self.link  # return the list local for this process.

        

    def func_a(self):
        
        with self.Pool(processes=self.os.cpu_count()) as pool:

            all_results = pool.map(self.func_b,enumerate(self.arr_a))
        new_links = []
        for items_at_index in zip(all_results):
            # in each interation "item_at_index" will have a slice
            # of all returned "link" lists for that index (that is what `zip` does)
            items_with_results = [item for item in items_at_index if item is not 0]
            if items_with_results:
                new_links.append(class_b.consolidate(items_with_results))
            else:
                new_links.append(0)

        self.link = new_links

        print(self.link)

def task():

    obj=class_a()
    obj.func_a()

if __name__ == '__main__':
    task()




© www.soinside.com 2019 - 2024. All rights reserved.