无法使用多重处理更改嵌套对象中的数据

问题描述 投票:0回答:2

我正在编写一个大程序作为个人项目,其中我有嵌套对象。例如,世界包含多个环境,每个环境包含多个人。我想做的是,我试图将这些人的特征从人物类返回到环境类,再返回到世界类,然后返回到主程序以显示所述结果。我正在使用多重处理来运行每个 People 对象,其中的特征会主动变化。

所附代码是问题的较小版本。

Type1
生成
Type2
,其中
Type2
更改其值,然后
Type1
值根据
Type2
更改而更改,但
Type1
实例不会检测到
Type2
中的更改。我尝试使用锁、管理器、值、队列和同步管理器(全部来自多处理),但似乎都不起作用。我是否遗漏了一些明显的东西?或者是否有不同的首选程序结构来实现我想做的事情?

我的预期输出是根据

Type1
Type2
中的
change()
函数将其特定数据类型值更改为
Type1
Type2

尝试根据下面答案中的建议使用管理器并更改 dict 会导致 none 类型,这表明 shared_var 未在不同对象的进程之间正确共享。

import multiprocessing import time class Type1: def __init__(self) -> None: self.array =[] self.dict = {} self.text = "" self.number = 0 self.process_dict = {} self.type2_dict = {} self.num = 0 def start(self): new_type = Type2() p = multiprocessing.Process(target=new_type.change) self.process_dict[f"type2{self.num}"] = p self.type2_dict[f"type2{self.num}"] = new_type self.num += 1 p.start() def stop(self): while len(self.process_dict) > 0: _, process = self.process_dict.popitem() process.terminate() process.join() def change(self): self.array, self.dict, self.text, self.number = self.type2_dict[f"type2{self.num-1}"].get_data() def print(self): print(self.array) print(self.dict) print(self.text) print(self.number) print(self.process_dict) class Type2: def __init__(self) -> None: self.array =[] self.dict = {} self.text = "Hello" self.number = 0 def change(self): while True: self.array = [6,7,8,9,10] self.dict = {"d":4,"e":5,"f":6} self.text = "Goodbye" self.number += 1 print("Type2 changed") def get_data(self): return self.array, self.dict, self.text, self.number if __name__ == "__main__": t = Type1() t.start() time.sleep(2) print("\n\nType1 After Start:") t.change() t.print() t.stop() print("\n\nType1 After Finish:") t.print() print("Type1 stopped")


python multiprocessing shared-variable
2个回答
0
投票
import multiprocessing import time class Type1: def __init__(self, array, dict, text, number) -> None: self.array = array self.dict = dict self.text = text self.number = number self.process_dict = {} self.type2_dict = {} self.num = 0 def start(self): new_type = Type2(self.array, self.dict, self.text, self.number) p = multiprocessing.Process(target=new_type.change, args=(self.dict,)) self.process_dict[f"type2{self.num}"] = p self.type2_dict[f"type2{self.num}"] = new_type self.num += 1 p.start() def stop(self): while len(self.process_dict) > 0: _, process = self.process_dict.popitem() process.terminate() process.join() def change(self): self.array, self.dict, self.text, self.number = self.type2_dict[f"type2{self.num-1}"].get_data() def print(self): print(self.array.value) print(self.dict.value) print(self.text.value) print(self.number.value) print(self.process_dict) class Type2: def __init__(self, array, dict, text, number) -> None: self.array = array self.dict = dict self.text = text self.number = number def change(self, shared_var): while True: self.array = [6,7,8,9,10] self.text = "Goodbye" self.number = 1 self.dict = shared_var # increase each number in the dictionary by 1 for key in self.dict.keys(): self.dict[key] += 1 # shared_var = self.dict # print("Type2 changed") time.sleep(0.01) def get_data(self): return self.array, self.dict, self.text, self.number if __name__ == "__main__": manager = multiprocessing.Manager() array = manager.list([1,2,3,4,5]) dict = manager.dict({"a":1,"b":2,"c":3}) text = manager.Value("s", "Hello") number = manager.Value("i", 0) t = Type1(array, dict, text, number) t.start() print("\n\nType1 After Start:") t.change() t.print() time.sleep(2) t.stop() print("\n\nType1 After Finish:") t.print() print("Type1 stopped")

,您可以在主线程中共享多个管理器变量,并在主线程不结束的情况下在进程内访问它们,所以我在第二个进程中使用

multiprocessing.Manager()
主线不掉。
process2.join()

在示例中,process0 会每 0.01 更新一次管理器变量+1,而 process2 只会每 1 秒通知控制台一次更新,其结果是:

import multiprocessing import time def worker(shared_var, update_var): while True: # Simulate some work if update_var: # Send data to the main process through the queue current_value = shared_var.value new_value = current_value + 1 shared_var.value = new_value time.sleep(0.01) else: current_value = shared_var.value print("Main Process:", current_value) time.sleep(1) if __name__ == "__main__": # Create a multiprocessing queue manager = multiprocessing.Manager() shared_var = manager.Value('i', 0) # Create a multiprocessing process process = multiprocessing.Process(target=worker, args=(shared_var, True, )) process.start() process2 = multiprocessing.Process(target=worker, args=(shared_var, False, )) process2.start() process2.join()

进程间互不中断,以主线程为桥梁进行通信。


0
投票

Main Process: 2 Main Process: 65 Main Process: 129 Main Process: 192 Main Process: 254 Main Process: 317 Main Process: 381 ...

但是 
dict = manager.dict({"a":1,"b":2,"c":3})

是一个内置类,您刚刚无意中覆盖了其定义,因此如果存在诸如

dict
之类的任何代码,它将不再起作用。不要重新定义内置函数、类等
让我们从第二个编码示例开始。您似乎了解 

d = dict(x=1, y=2)

,它是

multiprocessing.Manager
的子类,它定义了您正在使用的一组
driven
类,但没有效果。所提到的问题是您的 multiprocessing.managers.BaseManager 实例正在子进程
 中执行,并带有您的 
Type2 实例 Type2
 所引用的 
Type1
 实例的副本
。因此,当调用
t
方法时,它会更新子进程的
Type2.change
实例副本,并且
Type2
引用的
Type2
实例保持不变。
你的

t

类相当简单,Python允许我们基于这个类很容易地创建一个新的托管类。以下代码注册了

Type2
Type2Manager
的子类,一种新的托管类型
multiprocessing.managers.BaseManager
,我简化了您的示例来演示这一点。请参阅
this
(向下滚动并特别阅读自定义管理器部分): Type2

打印:

import multiprocessing from multiprocessing.managers import BaseManager import time class Type2Manager(BaseManager): pass class Type1: def __init__(self, manager: Type2Manager) -> None: self.manager = manager self.array = [] self.dict = {} self.text = "" self.number = 0 def change(self): type2 = self.manager.Type2() p = multiprocessing.Process(target=type2.change) p.start() p.join() # Wait for change to complete self.array, self.dict, self.text, self.number = type2.get_data() def print(self): print("array:", self.array) print("dict:", self.dict) print("text:", self.text) print("number:", self.number) class Type2: def __init__(self) -> None: self.array = [] self.dict = {} self.text = "Hello" self.number = 0 def change(self): self.array = [6, 7, 8, 9, 10] self.dict = {"d": 4, "e": 5, "f": 6} self.text = "Goodbye" self.number = 1 def get_data(self): return self.array, self.dict, self.text, self.number Type2Manager.register('Type2', Type2) if __name__ == "__main__": with Type2Manager() as manager: t = Type1(manager) # Before calling t.change(): print("Type1 Before Change:") t.print() t.change() print("\n\nType1 After Change:") t.print()

© www.soinside.com 2019 - 2024. All rights reserved.