程序运行良好,直到
to_sum
列表的大小达到大约 1150。之后,进程将在 task_queue = result_queue
的第一个点挂起。它们将成功填充结果队列并终止,但随后将挂起。如果数组的大小低于 1150,则不会发生此问题。
重新启动计算机有时允许程序在由于大小而挂起之前使用更大大小的数组,但它始终在 1100-1300 范围内。
你知道什么会导致这个问题吗?
import multiprocessing
class CustomProcess(multiprocessing.Process):
def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.name = name
self.task_queue = task_queue
self.result_queue = result_queue
self.chunks = chunks
self.lock = lock
def run(self):
while True:
"""
Using a lock to avoid a race condition where 2 threads both get a number, then
both to get another, but it is None so they both put the number back resulting in a result queue
with bigger size. For example:
Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
achieving 500.
With race condition result is 501.
[1, 2, None, None]
Thread 1 gets 1.
Thread 2 gets 2.
Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
A lock on both gets removes the race condition.
"""
with self.lock:
if not self.task_queue.empty():
number_1 = self.task_queue.get()
self.task_queue.task_done()
if number_1 is None:
#Poison pill - terminate.
print(f"Terminated {self.name}")
return
else:
#Queue empty - terminate.
return
if not self.task_queue.empty():
number_2 = self.task_queue.get()
self.task_queue.task_done()
if number_2 is None:
#Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
#acquired.
self.result_queue.put(number_1)
print(f"Terminated {self.name}")
return
else:
self.result_queue.put(number_1)
#Queue empty, put the 1 number in result queue and terminate.
return
self.result_queue.put(number_1 + number_2)
def multiprocess_sum(array):
if len(array) == 1:
return array[0]
lock = multiprocessing.Lock()
task_queue = multiprocessing.JoinableQueue()
[task_queue.put(element) for element in to_sum]
task_queue_size = len(array)
while task_queue_size > 1:
print(task_queue.qsize(), task_queue_size)
result_queue = multiprocessing.JoinableQueue()
processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
for i in range(8)]
[task_queue.put(None) for process in processes]
[process.start() for process in processes]
#[process.join() for process in processes]
task_queue.join()
task_queue = result_queue
task_queue_size = task_queue_size // 2 + task_queue_size % 2
return result_queue.get()
if __name__ == "__main__":
to_sum = [i for i in range(1350)]
"""
If range is below 1200, the program will run and compute everything correctly.
If it is above it, it will hang at the first halving, the moment the first task_queue is empty and the
result_queue becomes the new task_queue.
Computer restart will make the range values fluctuate, yesterday it would hang at 1177 but run fine up to 1776.
Queue pipe full???
"""
print(sum(to_sum))
for i in range(5):
print(multiprocess_sum(to_sum))
评论太长,所以:
首先,在多处理队列上调用
empty
和 qsize
不可靠,不应使用(阅读文档)。其次,您使用的锁会阻止任何真正的多处理发生,因为在您的 run
方法中完成的大部分处理是串行执行的。第三,1359个数相加需要1349次相加(不然怎么可能?)。因此,只需将 1350 个数字尽可能均匀地分成 8 个列表,然后让每个进程对它们的列表求和并返回结果。然后只需将 8 个返回值相加即可得到最终结果。
随着
to_sum
的大小的增长,最终添加 8 个部分和对总运行时间的贡献可以忽略不计。
import multiprocessing
class CustomProcess(multiprocessing.Process):
def __init__(self, name, task_queue, result_queue, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
self.result_queue.put(sum(self.task_queue.get()))
def split(iterable, n): # function to split iterable in n even parts
if type(iterable) is range and iterable.step != 1:
# algorithm doesn't work with steps other than 1:
iterable = list(iterable)
l = len(iterable)
n = min(l, n)
k, m = divmod(l, n)
return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
N_PROCESSES = 8
def multiprocess_sum(l):
if len(l) == 1:
return l[0]
task_queue = multiprocessing.Queue()
lists = split(l, N_PROCESSES)
for l in lists:
task_queue.put(l)
result_queue = multiprocessing.Queue()
processes = [
CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue)
for _ in range(N_PROCESSES)
]
for process in processes:
process.start()
the_sum = sum(result_queue.get() for _ in range(N_PROCESSES))
for process in processes:
process.join()
return the_sum
if __name__ == "__main__":
to_sum = list(range(1350))
print(sum(to_sum))
for i in range(5):
print(multiprocess_sum(to_sum))
印花:
910575
910575
910575
910575
910575
910575
我想知道
to_sum
列表必须有多大才能通过对这个特定问题使用多处理来节省时间。
使用经理解决问题。
import multiprocessing
"""
Using a manager avoids the problem where the program will hang if the input is too big.
"""
class CustomProcess(multiprocessing.Process):
def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
super().__init__(name=name, *args, **kwargs)
self.name = name
self.task_queue = task_queue
self.result_queue = result_queue
self.chunks = chunks
self.lock = lock
def run(self):
while True:
"""
Using a lock to avoid a race condition where 2 threads both get a number, then
both to get another, but it is None so they both put the number back resulting in a result queue
with bigger size. For example:
Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
achieving 500.
With race condition result is 501.
[1, 2, None, None]
Thread 1 gets 1.
Thread 2 gets 2.
Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
A lock on both gets removes the race condition.
"""
with self.lock:
if not self.task_queue.empty():
number_1 = self.task_queue.get()
self.task_queue.task_done()
if number_1 is None:
#Poison pill - terminate.
print(f"Terminated {self.name}")
return
else:
#Queue empty - terminate.
return
if not self.task_queue.empty():
number_2 = self.task_queue.get()
self.task_queue.task_done()
if number_2 is None:
#Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
#acquired.
self.result_queue.put(number_1)
print(f"Terminated {self.name}")
return
else:
self.result_queue.put(number_1)
#Queue empty, put the 1 number in result queue and terminate.
return
self.result_queue.put(number_1 + number_2)
def multiprocess_sum(array):
if len(array) == 1:
return array[0]
lock = multiprocessing.Lock()
with multiprocessing.Manager() as manager:
task_queue = manager.Queue()
[task_queue.put(element) for element in to_sum]
task_queue_size = len(array)
while task_queue_size > 1:
result_queue = manager.Queue()
processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
for i in range(8)]
[task_queue.put(None) for process in processes]
[process.start() for process in processes]
#[process.join() for process in processes]
task_queue.join()
task_queue = result_queue
task_queue_size = task_queue_size // 2 + task_queue_size % 2
return result_queue.get()
if __name__ == "__main__":
to_sum = [i for i in range(2000)]
print(sum(to_sum))
for i in range(5):
print(multiprocess_sum(to_sum))
我必须同意用户 Booboo 的观点,即对于高性能并行求和函数,此特定实现并不是最好的(实际上可能是最差的)。如果您正在寻找并行求和函数,请阅读他的回答。如果您对为什么您的程序在面对大队列时挂起感兴趣,那么使用管理器应该可以解决您的问题。如果您有一个并行 reduce 函数的实现(更好地处理毒丸竞争条件),请分享以改善那些专门寻找它的人所提供的实现。 此致, 塔里