我正在编写一个大程序作为个人项目,其中我有嵌套对象。例如,世界包含多个环境,每个环境包含多个人。我想做的是,我试图将这些人的特征从人物类返回到环境类,再返回到世界类,然后返回到主程序以显示所述结果。我正在使用多重处理来运行每个 People 对象,其中的特征会主动变化。
所附代码是问题的较小版本。
Type1
生成 Type2
,其中 Type2
更改其值,然后 Type1
值根据 Type2
更改而更改,但 Type1
实例不会检测到 Type2
中的更改。我尝试使用锁、管理器、值、队列和同步管理器(全部来自多处理),但似乎都不起作用。我是否遗漏了一些明显的东西?或者是否有不同的首选程序结构来实现我想做的事情?
我的预期输出是根据
Type1
和Type2
中的
change()
函数将其特定数据类型值更改为
Type1
Type2
尝试根据下面答案中的建议使用管理器并更改 dict 会导致 none 类型,这表明 shared_var 未在不同对象的进程之间正确共享。
import multiprocessing
import time
class Type1:
def __init__(self) -> None:
self.array =[]
self.dict = {}
self.text = ""
self.number = 0
self.process_dict = {}
self.type2_dict = {}
self.num = 0
def start(self):
new_type = Type2()
p = multiprocessing.Process(target=new_type.change)
self.process_dict[f"type2{self.num}"] = p
self.type2_dict[f"type2{self.num}"] = new_type
self.num += 1
p.start()
def stop(self):
while len(self.process_dict) > 0:
_, process = self.process_dict.popitem()
process.terminate()
process.join()
def change(self):
self.array, self.dict, self.text, self.number = self.type2_dict[f"type2{self.num-1}"].get_data()
def print(self):
print(self.array)
print(self.dict)
print(self.text)
print(self.number)
print(self.process_dict)
class Type2:
def __init__(self) -> None:
self.array =[]
self.dict = {}
self.text = "Hello"
self.number = 0
def change(self):
while True:
self.array = [6,7,8,9,10]
self.dict = {"d":4,"e":5,"f":6}
self.text = "Goodbye"
self.number += 1
print("Type2 changed")
def get_data(self):
return self.array, self.dict, self.text, self.number
if __name__ == "__main__":
t = Type1()
t.start()
time.sleep(2)
print("\n\nType1 After Start:")
t.change()
t.print()
t.stop()
print("\n\nType1 After Finish:")
t.print()
print("Type1 stopped")
import multiprocessing
import time
class Type1:
def __init__(self, array, dict, text, number) -> None:
self.array = array
self.dict = dict
self.text = text
self.number = number
self.process_dict = {}
self.type2_dict = {}
self.num = 0
def start(self):
new_type = Type2(self.array, self.dict, self.text, self.number)
p = multiprocessing.Process(target=new_type.change, args=(self.dict,))
self.process_dict[f"type2{self.num}"] = p
self.type2_dict[f"type2{self.num}"] = new_type
self.num += 1
p.start()
def stop(self):
while len(self.process_dict) > 0:
_, process = self.process_dict.popitem()
process.terminate()
process.join()
def change(self):
self.array, self.dict, self.text, self.number = self.type2_dict[f"type2{self.num-1}"].get_data()
def print(self):
print(self.array.value)
print(self.dict.value)
print(self.text.value)
print(self.number.value)
print(self.process_dict)
class Type2:
def __init__(self, array, dict, text, number) -> None:
self.array = array
self.dict = dict
self.text = text
self.number = number
def change(self, shared_var):
while True:
self.array = [6,7,8,9,10]
self.text = "Goodbye"
self.number = 1
self.dict = shared_var
# increase each number in the dictionary by 1
for key in self.dict.keys():
self.dict[key] += 1
# shared_var = self.dict
# print("Type2 changed")
time.sleep(0.01)
def get_data(self):
return self.array, self.dict, self.text, self.number
if __name__ == "__main__":
manager = multiprocessing.Manager()
array = manager.list([1,2,3,4,5])
dict = manager.dict({"a":1,"b":2,"c":3})
text = manager.Value("s", "Hello")
number = manager.Value("i", 0)
t = Type1(array, dict, text, number)
t.start()
print("\n\nType1 After Start:")
t.change()
t.print()
time.sleep(2)
t.stop()
print("\n\nType1 After Finish:")
t.print()
print("Type1 stopped")
,您可以在主线程中共享多个管理器变量,并在主线程不结束的情况下在进程内访问它们,所以我在第二个进程中使用
multiprocessing.Manager()
主线不掉。process2.join()
在示例中,process0 会每 0.01 更新一次管理器变量+1,而 process2 只会每 1 秒通知控制台一次更新,其结果是:
import multiprocessing
import time
def worker(shared_var, update_var):
while True:
# Simulate some work
if update_var:
# Send data to the main process through the queue
current_value = shared_var.value
new_value = current_value + 1
shared_var.value = new_value
time.sleep(0.01)
else:
current_value = shared_var.value
print("Main Process:", current_value)
time.sleep(1)
if __name__ == "__main__":
# Create a multiprocessing queue
manager = multiprocessing.Manager()
shared_var = manager.Value('i', 0)
# Create a multiprocessing process
process = multiprocessing.Process(target=worker, args=(shared_var, True, ))
process.start()
process2 = multiprocessing.Process(target=worker, args=(shared_var, False, ))
process2.start()
process2.join()
Main Process: 2
Main Process: 65
Main Process: 129
Main Process: 192
Main Process: 254
Main Process: 317
Main Process: 381
...
但是
dict = manager.dict({"a":1,"b":2,"c":3})
是一个内置类,您刚刚无意中覆盖了其定义,因此如果存在诸如
dict
之类的任何代码,它将不再起作用。不要重新定义内置函数、类等让我们从第二个编码示例开始。您似乎了解 d = dict(x=1, y=2)
,它是
multiprocessing.Manager
的子类,它定义了您正在使用的一组 driven类,但没有效果。所提到的问题是您的
multiprocessing.managers.BaseManager
实例正在子进程 中执行,并带有您的
Type2
实例 Type2
所引用的
Type1
实例的副本。因此,当调用
t
方法时,它会更新子进程的 Type2.change
实例副本,并且 Type2
引用的 Type2
实例保持不变。你的t
类相当简单,Python允许我们基于这个类很容易地创建一个新的托管类。以下代码注册了
Type2
,Type2Manager
的子类,一种新的托管类型multiprocessing.managers.BaseManager
,我简化了您的示例来演示这一点。请参阅this(向下滚动并特别阅读自定义管理器部分):
Type2
打印:
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class Type2Manager(BaseManager):
pass
class Type1:
def __init__(self, manager: Type2Manager) -> None:
self.manager = manager
self.array = []
self.dict = {}
self.text = ""
self.number = 0
def change(self):
type2 = self.manager.Type2()
p = multiprocessing.Process(target=type2.change)
p.start()
p.join() # Wait for change to complete
self.array, self.dict, self.text, self.number = type2.get_data()
def print(self):
print("array:", self.array)
print("dict:", self.dict)
print("text:", self.text)
print("number:", self.number)
class Type2:
def __init__(self) -> None:
self.array = []
self.dict = {}
self.text = "Hello"
self.number = 0
def change(self):
self.array = [6, 7, 8, 9, 10]
self.dict = {"d": 4, "e": 5, "f": 6}
self.text = "Goodbye"
self.number = 1
def get_data(self):
return self.array, self.dict, self.text, self.number
Type2Manager.register('Type2', Type2)
if __name__ == "__main__":
with Type2Manager() as manager:
t = Type1(manager)
# Before calling t.change():
print("Type1 Before Change:")
t.print()
t.change()
print("\n\nType1 After Change:")
t.print()