从我在单独进程中读取的队列中将数据返回到主程序

问题描述 投票:0回答:1

我正在使用多处理功能计算数据。 “worker”脚本将其结果放入队列中,然后我可以使用以下命令轻松地在主程序中读取该结果:

result=[]
while not Q.empty():
    result.append(Q.get())

但是,如果队列中有很多数据,我想我最好使用 cpu 的一两个核心来开始读取队列,同时工作进程继续工作。我在这里找到了一段代码:如何在Python中使用多处理队列?创建“读取器”进程,不断读取队列,直到工作人员告诉他们计算结束。这工作正常,但读者只读取队列并且不返回任何内容。我如何实际从队列中获取数据以便在我的主程序中使用它?我找到的唯一解决方案是使用 multiprocess.Manager() 创建一个列表,并将其作为参数传递给读者。这有效,但需要很长时间!它完全杀死了我程序的执行时间,所以第一种方法(直接在主程序中读取队列)要好得多。我还有其他解决方案吗?实际上是否可以在主程序中获取我在单独进程中读取的队列中的数据?

下面是一个示例代码,由到处收集的各种代码片段构建而成:

import multiprocessing as mp
from datetime import datetime


def worker(numbers, start, end, qu):
    """A worker function to calculate squares of numbers."""
    res=[]
    for i in range(start, end):
        res.append(numbers[i] * numbers[i])
    qu.put(res)

def reader(q, outputlist):
    """Read from the queue; this spawns as a separate Process"""
    #returnedlist=[]
    while True:
        msg = q.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break
        [outputlist.append(x) for x in msg] # comment out if using 1st method (reading queue in the main program)
    return

def start_reader_procs(q, num_of_reader_procs, L):
    """Start the reader processes and return all in a list to the caller
    source: 
    https://stackoverflow.com/questions/11515944/how-to-use-multiprocessing-queue-in-python"""
    
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = mp.Process(target=reader, args=((q),L,))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc
        
        all_reader_procs.append(reader_p)

    return all_reader_procs 
    
def main(core_count):
    numbers = range(50000)  # A larger range for a more evident effect of multiprocessing
    segment = len(numbers) // core_count
    processes = []
    #Q = mp.Queue()
    m = mp.Manager()
    Q = m.Queue()
    
    # comment out if using 1st method (reading queue in the main program)
    #----------------------------------
    num_of_reader_procs=2
    L = m.list()
    all_reader_procs = start_reader_procs(Q, num_of_reader_procs, L)
    #-----------------------------------------
    
    for i in range(core_count):
        start = i * segment
        if i == core_count - 1:
            end = len(numbers)  # Ensure the last segment goes up to the end
        else:
            end = start + segment
        # Creating a process for each segment
        p = mp.Process(target=worker, args=(numbers, start, end, Q))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("All worker processes terminated")

    # comment out if using 1st method (reading queue in the main program)
    #----------------------------------
    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        Q.put("DONE")
    for idx, a_reader_proc in enumerate(all_reader_procs):
        print("    Waiting for reader_p.join() index %s" % idx)
        a_reader_proc.join()  # Wait for a_reader_proc() to finish
        print("        reader_p() idx:%s is done" % idx)    
    result=list(L)
    #----------------------------------
    
#   result=[]
#   while not Q.empty():
#       result.append(Q.get())
#   result = [x for L in result for x in L] # flatten the list of lists
    
    return result

if __name__ == '__main__':
    
    for core_count in [1, 2, 4]:
        
        starttime = datetime.now()
        print(f"Using {core_count} core(s):")
        result = main(core_count)
        print(f"First 10 squares: {list(result)[:10]}")  # Display the first 10 results as a sample
        endtime = datetime.now()
        print ("Total computation time : {:.1f} sec".format((endtime-starttime).total_seconds()))
        print()
python multiprocessing queue
1个回答
0
投票

这里的问题是如何最好地使队列可供子进程使用。

一种技术是使用多处理池和初始化器,将队列放置在子进程的全局空间中。

这是一个示例,您应该能够适应您的需求:

from multiprocessing import Pool, Queue

def worker(a):
    global q
    lst = [i * i for i in range(*a)]
    q.put(lst)

def init(_q):
    global q
    q = _q

def main():
    q = Queue()
    args = [(i, i + 10) for i in range(0, 100, 10)]
    with Pool(initializer=init, initargs=(q,)) as pool:
        pool.map(worker, args)
        pool.close()
        pool.join()
    while not q.empty():
        print(q.get())

if __name__ == "__main__":
    main()

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
[400, 441, 484, 529, 576, 625, 676, 729, 784, 841]
[900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521]
[1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401]
[2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481]
[3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761]
[4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241]
[6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921]
[8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.