如何使用 Python 多重处理与未定义的 Queue.task_done()

问题描述 投票:0回答:1

我有以下脚本,我想通过在 CPU 核心之间拆分任务来加快处理速度。

myscript.py

from queue import Queue
from worker_script import Worker
    
task_queue = Queue(maxsize=1)

while some_other_functions:
    task_queue.put(Some_Stuff)          ## Step happens randomly anytime. 
                                        ## Hence number of input tasks cannot be known

Worker(task_queue).start()

task_queue.join()

## further script that needs the above task_queue to be successfully completed first

worker_script.py

class Worker:

    def __init__(self, task_queue):
        self.task_queue = task_queue

    def run(self):
        while True:

            task_object = self.task_queue.get()
            
            do_some_big_calculation(task_object)       #### The Step I want to use Multiprocess

            self.task_queue.task_done()

如果我要将多重处理应用于

do_some_big_calculation(task_object)
,我将如何将
task_done()
方法应用于
task_queue
? 我正在申请
task_done()
,以便
task_queue.join()
拦截者知道分配的所有任务都已完成。

我在

worker_script.py
中的实现是:

import multiprocessing

class Worker:

    def __init__(self, task_queue):
        self.task_queue = task_queue

    def run(self):
        num_processes = multiprocessing.cpu_count() - 2
        jobs = []
        with multiprocessing.Pool(processes=num_processes) as pool:
            while True:
                new_job = pool.apply_async(self.sub_worker)
                jobs.append(new_job)

    def sub_worker(self):
        while True:

            task_object = self.task_queue.get()
            
            do_some_big_calculation(task_object)       #### The Step I want to use Multiprocess

            self.task_queue.task_done()

我依靠

.get()
来阻止多重处理,直到另一个任务可用。 然而,我修改后的实现陷入了我无法弄清楚的地方。

对我的实施或全新解决方案有任何帮助,我们将不胜感激! 谢谢!

python multiprocessing queue
1个回答
0
投票

你说你“被困在某个地方”,但你还没有发布一个最小的、可重现的例子,所以很难确定你被困在哪里以及为什么被困(无论被困意味着什么)。

我确实注意到你的

run
方法有一个无限循环提交“作业”,所以程序永远不会完成。在某些时候,
run
必须决定终止,然后它可以向
task_queue.join()
发出调用以等待所有任务完成。但是,如果您想在
task_done
上调用方法
self.task_queue
,那么
self.task_queue
必须是
multiprocessing.JoinableQueue
的实例,但事实并非如此。您还可以在
start
实例上调用不存在的
Worker
方法。

您根本不需要使用

multiprocesssing.JoinableQueue
实例,因为
multiprocessing.pool Pool
实例有自己的内部队列用于提交作业和接收结果。您的代码应该看起来更像:

worker_script.py

import multiprocessing

class Worker:
    def __init__(self):
        # Why multiprocessing.cpu_count() - 2?
        num_processes = multiprocessing.cpu_count() - 2
        self._pool = Pool(num_processes)

    def submit_task(self, task_object):
        # Node need to save returned AsyncResult instance unless you need a
        # return value from sub_worker:
        self.pool.apply_async(self.sub_worker, args=(task_object,))
        
    def terminate(self):
        """Wait for all submitted tasks to complete."""
        self._pool.close()
        self._pool.join()
    
    def sub_worker(self, task_object):
        do_some_big_calculation(task_object)

myscript.py

def main():
    worker = Worker()

    while some_other_functions:
        worker.submit_task(task_object)
        
    # Wait for all submitted tasks to complete:
    worker.terminate()

if __name__ == '__main__'
    main()
© www.soinside.com 2019 - 2024. All rights reserved.