当 Python 中的任务队列大小超过 1200 个元素时,多进程 reduce 函数挂起

问题描述 投票:0回答:2

程序运行良好,直到

to_sum
列表的大小达到大约 1150。之后,进程将在
task_queue = result_queue
的第一个点挂起。它们将成功填充结果队列并终止,但随后将挂起。如果数组的大小低于 1150,则不会发生此问题。 重新启动计算机有时允许程序在由于大小而挂起之前使用更大大小的数组,但它始终在 1100-1300 范围内。 你知道什么会导致这个问题吗?

    import multiprocessing

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.chunks = chunks
        self.lock = lock

    def run(self):
        while True:
            """
            Using a lock to avoid a race condition where 2 threads both get a number, then
            both to get another, but it is None so they both put the number back resulting in a result queue
            with bigger size. For example:
            Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
            achieving 500.
            With race condition result is 501.
            [1, 2, None, None]
            Thread 1 gets 1.
            Thread 2 gets 2.
            Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
            Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
            A lock on both gets removes the race condition.
            """
            with self.lock:
                if not self.task_queue.empty():
                    number_1 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_1 is None:
                        #Poison pill - terminate.
                        print(f"Terminated {self.name}")
                        return
                else:
                    #Queue empty - terminate.
                    return
                if not self.task_queue.empty():
                    number_2 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_2 is None:
                        #Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
                        #acquired.
                        self.result_queue.put(number_1)
                        print(f"Terminated {self.name}")
                        return
                else:
                    self.result_queue.put(number_1)
                    #Queue empty, put the 1 number in result queue and terminate.
                    return
            self.result_queue.put(number_1 + number_2)


def multiprocess_sum(array):
    if len(array) == 1:
        return array[0]
    lock = multiprocessing.Lock()
    task_queue = multiprocessing.JoinableQueue()
    [task_queue.put(element) for element in to_sum]
    task_queue_size = len(array)

    while task_queue_size > 1:
        print(task_queue.qsize(), task_queue_size)
        result_queue = multiprocessing.JoinableQueue()
        processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
                    for i in range(8)]
        [task_queue.put(None) for process in processes]
        [process.start() for process in processes]
        #[process.join() for process in processes]
        task_queue.join()
        task_queue = result_queue
        task_queue_size = task_queue_size // 2 + task_queue_size % 2
    return result_queue.get()

if __name__ == "__main__":
    to_sum = [i for i in range(1350)]
    """
    If range is below 1200, the program will run and compute everything correctly.
    If it is above it, it will hang at the first halving, the moment the first task_queue is empty and the
    result_queue becomes the new task_queue. 
    Computer restart will make the range values fluctuate, yesterday it would hang at 1177 but run fine up to 1776.
    Queue pipe full???
    """
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))
python multithreading multiprocessing python-multiprocessing reduce
2个回答
2
投票

评论太长,所以:

首先,在多处理队列上调用

empty
qsize
不可靠,不应使用(阅读文档)。其次,您使用的锁会阻止任何真正的多处理发生,因为在您的
run
方法中完成的大部分处理是串行执行的。第三,1359个数相加需要1349次相加(不然怎么可能?)。因此,只需将 1350 个数字尽可能均匀地分成 8 个列表,然后让每个进程对它们的列表求和并返回结果。然后只需将 8 个返回值相加即可得到最终结果。

随着

to_sum
的大小的增长,最终添加 8 个部分和对总运行时间的贡献可以忽略不计。

import multiprocessing

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        self.result_queue.put(sum(self.task_queue.get()))

def split(iterable, n):  # function to split iterable in n even parts
    if type(iterable) is range and iterable.step != 1:
        # algorithm doesn't work with steps other than 1:
        iterable = list(iterable)
    l = len(iterable)
    n = min(l, n)
    k, m = divmod(l, n)
    return (iterable[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))

N_PROCESSES = 8

def multiprocess_sum(l):
    if len(l) == 1:
        return l[0]

    task_queue = multiprocessing.Queue()
    lists = split(l, N_PROCESSES)
    for l in lists:
        task_queue.put(l)

    result_queue = multiprocessing.Queue()

    processes = [
        CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue)
        for _ in range(N_PROCESSES)
    ]
    for process in processes:
        process.start()

    the_sum = sum(result_queue.get() for _ in range(N_PROCESSES))

    for process in processes:
        process.join()

    return the_sum

if __name__ == "__main__":
    to_sum = list(range(1350))
    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

印花:

910575
910575
910575
910575
910575
910575

我想知道

to_sum
列表必须有多大才能通过对这个特定问题使用多处理来节省时间。


0
投票

使用经理解决问题。

import multiprocessing
"""
Using a manager avoids the problem where the program will hang if the input is too big.
"""

class CustomProcess(multiprocessing.Process):

    def __init__(self, name, task_queue, result_queue, lock, chunks=2, *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.name = name
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.chunks = chunks
        self.lock = lock

    def run(self):
        while True:
            """
            Using a lock to avoid a race condition where 2 threads both get a number, then
            both to get another, but it is None so they both put the number back resulting in a result queue
            with bigger size. For example:
            Expected result_queue_size = 500, current_queue_size 499, after summation of 1 and 2 we will add 3 to result queue
            achieving 500.
            With race condition result is 501.
            [1, 2, None, None]
            Thread 1 gets 1.
            Thread 2 gets 2.
            Thread 1 gets None and puts 1 in result queue instead of getting 2 and summing.
            Thread 2 gets None and puts 2 in result queue instead of just getting the first None and returning.
            A lock on both gets removes the race condition.
            """
            with self.lock:
                if not self.task_queue.empty():
                    number_1 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_1 is None:
                        #Poison pill - terminate.
                        print(f"Terminated {self.name}")
                        return
                else:
                    #Queue empty - terminate.
                    return
                if not self.task_queue.empty():
                    number_2 = self.task_queue.get()
                    self.task_queue.task_done()
                    if number_2 is None:
                        #Cannot compute sum of 1 number so just add number_1 to result_queue and terminate since poison pill
                        #acquired.
                        self.result_queue.put(number_1)
                        print(f"Terminated {self.name}")
                        return
                else:
                    self.result_queue.put(number_1)
                    #Queue empty, put the 1 number in result queue and terminate.
                    return
            self.result_queue.put(number_1 + number_2)


def multiprocess_sum(array):
    if len(array) == 1:
        return array[0]
    lock = multiprocessing.Lock()

    with multiprocessing.Manager() as manager:

        task_queue = manager.Queue()
        [task_queue.put(element) for element in to_sum]
        task_queue_size = len(array)

        while task_queue_size > 1:
            result_queue = manager.Queue()

            processes = [CustomProcess(name=str(i), task_queue=task_queue, result_queue=result_queue, lock=lock)
                        for i in range(8)]
            [task_queue.put(None) for process in processes]
            [process.start() for process in processes]
            #[process.join() for process in processes]
            task_queue.join()
            task_queue = result_queue
            task_queue_size = task_queue_size // 2 + task_queue_size % 2
        return result_queue.get()

if __name__ == "__main__":
    to_sum = [i for i in range(2000)]

    print(sum(to_sum))
    for i in range(5):
        print(multiprocess_sum(to_sum))

我必须同意用户 Booboo 的观点,即对于高性能并行求和函数,此特定实现并不是最好的(实际上可能是最差的)。如果您正在寻找并行求和函数,请阅读他的回答。如果您对为什么您的程序在面对大队列时挂起感兴趣,那么使用管理器应该可以解决您的问题。如果您有一个并行 reduce 函数的实现(更好地处理毒丸竞争条件),请分享以改善那些专门寻找它的人所提供的实现。 此致, 塔里

© www.soinside.com 2019 - 2024. All rights reserved.