Manager, Lock
的帮助下在Python(版本3.7)中实现了SharedList
。我已将其用作使用多处理Process
函数调用创建的进程之间的共享对象。共享列表用于存储共享每个进程所生成的值/对象。用Python的Manager
的Lock
和multiprocessing
的SharedList
的实现class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.results = self.manager.list([])
self.lock = Lock()
self.limit = limit
def append(self, new_value):
with self.lock:
if len(self.results) == self.limit:
return False
self.results.append(new_value)
return True
def list(self):
with self.lock:
return list(self.results).copy()
使用创建的来存储使用SharedList
multiprocessing
创建的多个进程的值>results = SharedList(limit)
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = Process(target=child_function, args=(results))
processes.append(new_process)
new_process.start()
for _process in processes:
_process.join()
for _process in processes:
_process.close()
child_function
的实现
while True: result = func() if not (results.append(result)): break
[在某些情况下,该实现有效,但是当我增加限制时,挂断了。我使用的处理器数量少于CPU数量,并且相同的实验仍然挂在相同的位置。是否有更好的方法来解决上述问题,我已经研究了不同的方法,例如使用Queue,但这不能按预期工作,挂断电话?
使用队列添加了以前的实现
使用队列执行results_out = []
manager = multiprocessing.Manager()
results = manager.Queue()
tasks = manager.Queue()
num_processes = min(process_count, limit)
processes = []
for i in range(num_processes):
new_process = multiprocessing.Process(target=child_function,
args=(tasks, results)
processes.append(new_process)
new_process.start()
sleep(5)
for i in range(limit):
tasks.put(0)
sleep(1)
for i in range(num_processes):
tasks.put(-1)
num_finished_processes = 0
while True:
new_result = results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == num_processes:
break
else:
results_out.append(new_result)
for process in processes:
process.join()
for process in processes:
process.close()
在child_function
中
while True: task_val = tasks.get() if task_val < 0: results.put(-1) break else: result = func() results.put(result)
在发布此问题之前,我已经阅读了以下参考资料,但是我无法获得所需的输出。我同意,此代码导致了死锁状态,但是我无法在python中使用多处理来找到没有死锁的实现]更新
参考
根据建议,我可以使用Queue
修改
SharedList
class SharedList(object):
def __init__(self, limit):
self.manager = Manager()
self.tasks = self.manager.Queue()
self.results = self.manager.Queue()
self.limit = limit
self.no_of_process = min(process_count, limit)
def setup(self):
sleep(1)
for i in range(self.limit):
self.tasks.put(0)
sleep(1)
for i in range(self.no_of_process):
self.tasks.put(-1)
def append(self, new_value):
task_val = self.tasks.get()
if task_val < 0:
self.results.put(-1)
return False
else:
self.results.put(new_value)
return True
def list(self):
results_out = []
num_finished_processes = 0
while True:
new_result = self.results.get()
if new_result == -1:
num_finished_processes += 1
if num_finished_processes == self.no_of_process:
break
else:
results_out.append(new_result)
return results_out
此实现工作正常,但对实现进行了以下更改
results = SharedList(limit) num_processes = min(process_count, limit) processes = [] for i in range(num_processes): new_process = Process(target=child_function, args=(results)) processes.append(new_process) new_process.start() results.setup() for _process in processes: _process.join() for _process in processes: _process.close()
child_function
的实现
while True: result = func() if not (results.append(result)): break
但是,仍然经过几次迭代,最终陷入死锁,挂断了电话>]我已经在Manager(多重处理锁定)的帮助下在Python(3.7版)中实现了SharedList。我已将其用作使用多处理过程创建的过程之间的共享对象...