Python 多处理队列在没有等待计时器的情况下挂起

问题描述 投票:0回答:1

我遇到队列无法按预期工作的问题。这种设计的目标是让

muncher
可以根据需要花费尽可能少的时间或尽可能长的时间,但它总是能够充满工作(输入队列总是有东西,输出队列总是有空间)。我想最大化
muncher
的数据处理,同时
yoinker
yeeter
负责将数据移入/移出。

测试代码的要点是

yoink
一些数据(目前是凭空产生的),
munch
在其上,然后
yeet
将其放入以太中。
Yoinking
填充输入队列,
munching
使用输入队列并转储到输出队列,
yeeting
从输出队列中拉出。

理想情况下,它应该一切正常,我指望在停顿事件期间从队列中隐式等待(例如,

yoinker
填充输入队列,其线程在队列已满时停止;
yeeter
的输出队列为空,当输出队列被填满时,它的线程会停止)。

这似乎并不可靠地发生,这对于可靠地使用代码来说是有问题的。

如果没有等待计时器,单线程

yoinker
调用者将无法退出所有
muncher
/
yeeter
线程。通过
munching
/
yeeting
段上的等待计时器,单线程
yoinker
调用者通常可以完成它。

通过

munching
/
yeeting
段上的等待计时器,并行
yoinker
调用者甚至无法到达退出阶段。通过
yoinking
段以及
munching
/
yeeting
段上的等待计时器,它会成功。

我无法控制

yoink
/
munch
/
yeet
段完成的速度(有时可能是瞬时的,可能是连续多个几乎瞬时的
yoinks
或其他),有办法吗为了使这一切在并行
yoinker
调用者的情况下变得可靠,而不需要任何等待计时器?

import numpy as np
import multiprocessing as mp
from time import sleep


#SOMETHING -> INPUT QUEUE
def yoinker(incrementer):
    #--- make some data ---
    data_in = np.ones((2000,2000)); #make some big data
    data_in = data_in/np.sum(data_in)*incrementer; #tag it
    
    #--- ditch the data ---
    qb_in.put(data_in); #jam the data in
    
    # sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END DEF

#INPUT QUEUE -> SOMETHING -> OUTPUT QUEUE
def muncher(qb_in, qb_out):
    while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
        #--- get the data out ---
        data_in = qb_in.get(); #get the data out
        if( data_in is None ):
            break #this is how this gets to end
        #END IF
        
        #--- do something with the data ---
        data_out = data_in.copy(); #so much
        
        #--- ditch the data ---
        qb_out.put(data_out); #jam the data in
        
        # sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
        
    #END WHILE
#END DEF

#OUTPUT QUEUE -> SOMETHING
def yeeter(qb_out):
    while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
        #--- get the data out ---
        data_out = qb_out.get(); #get the data out
        if( data_out is None ):
            break #this is how this gets to end
        #END IF
        
        #--- save the data ---
        # print('got data_out, sum is: '+str(np.round(np.sum(np.sum(data_out))))); #do some reporting
        data_out = np.round(np.sum(np.sum(data_out)));
        
        # sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
    #END WHILE
#END DEF

def parallel_starmap_helper(fn, args): #for multiprocess starmap with kwargs, MUST be outside of the function that calls it or it just hangs
    return fn(*args)
#END DEF

def initer(_qb_in): #basically each process will call this function when it starts (b/c it's defined as an "initializer")
    global qb_in; #it lets each process know "qb_in" is a global variables (outside of the scope of the code, e.g., they'll appear w/o being defined)
    qb_in = _qb_in; #link em up here
#END DEF

if __name__=='__main__':
    #--- settings ---
    threads_in = 2; #number of threads for the input process (reads files, fills input queue with resulting data)
    threads_calc = 4; #number of threads for the calc process (reads input queue's resulting data, converts, fills output queue)
    threads_out = 2; #number of threads for the output process (reads output queue's converted data, saves files)
    queueSize_in = 5; # how many input files to hold (if emptied, stalls conversion)
    queueSize_out = 5; #how many output files to hold (if filled, stalls conversion)
    
    #--- define queues that hold input and output datas ---
    qb_in = mp.Queue(maxsize=queueSize_in); # Queue to hold input things
    qb_out = mp.Queue(maxsize=queueSize_out); # Queue to hold output things
    
    #--- build data generator parallel lists (not using queues) ---
    parallel_list = []; #Prep
    for j in range(0, 25):
        parallel_list.append([yoinker, [j]]); # pack up all needed function calls
    #END FOR j
    
    #--- build data cruncher lists (using queues) ---
    munchers = [mp.Process(target=muncher,args=(qb_in, qb_out)) for i in range(threads_calc)] # this function gets the data from the input queue, processes it, and then puts in the output queue
    yeeters = [mp.Process(target=yeeter,args=(qb_out, )) for i in range(threads_out)] # this function gets data processed and does the final steps
    
    #--- start up all processes that are NOT blocking ---
    for munch in munchers:
        munch.daemon = True; #say it lives for others
        munch.start(); #start each muncher up
    #END FOR munch
    for yeet in yeeters:
        yeet.daemon = True; #say it lives for others
        yeet.start(); #start each yeeter up
    #END FOR yeet
        
    for j in range(0, 25):
        yoinker(j); # pack up all needed function 
        print('placed j'+str(j))
    #END FOR j
    # #--- call blocking data generator ---
    # with mp.Pool(processes=threads_in, initializer=initer, initargs=(qb_in,)) as executor:
    #     executor.starmap(parallel_starmap_helper, parallel_list); #function you want is replaced with; parallel_starmap_kwarg_helper helps starmap distribute everything right
    # #END WITH
    for j in range(0, threads_calc):
        qb_in.put(None); #tell all muncher threads to quit (they get out of the qb_in queue)
        print('qb_in - Put a None')
    #END FOR j
    for j in range(0, threads_out):
        qb_out.put(None); #tell all yeeter threads to quit (they get out of the qb_out queue)
        print('qb_out - Put a None')
    #END FOR j

    #--- This portion lets you know if the code has hung ---
    #it does this via checking the queues. The queues should end as exactly enough `None`s have been put in to end all of the queues started, but without timers they do not always end.
    #This is here to give some feedback, since calling `.join()` on the processes will just sit there silently.
    FLG_theyDone = False;
    while( FLG_theyDone == False ):
        FLG_theyDone = True;
        print('\nStarting loop')
        if( qb_in.full() == True ):
          print('qb_in full');
        elif( qb_in.empty() == True ):
            print('qb_in empty');
        #END IF
        if( qb_out.full() == True ):
          print('qb_out full');
        elif( qb_out.empty() == True ):
            print('qb_out empty');
        #END IF
        
        for munch in munchers:
            print('munch - '+str(munch.exitcode))
            if( munch.exitcode is None ):
                FLG_theyDone = False;
                # try:
                #     qb_in.put(sentinel, block=False); #tell all muncher threads to quit
                # except:
                #     print('qb_in full, can\'t jam more Nones');
                # #END TRYING
            #END IF
        #END FOR munch
        # print('yeeters - '+str(yeeters))
        for yeet in yeeters:
            print('yeet - '+str(yeet.exitcode))
            if( yeet.exitcode is None ):
                FLG_theyDone = False;
                # try:
                #     qb_out.put(sentinel, block=False); #tell all muncher threads to quit
                # except:
                #     print('qb_outn full, can\'t jam more Nones');
                # #END TRYING
            #END IF
        #END FOR yeet
        
        sleep(np.random.random()+2); #nap time! wait between 0 and 1 seconds
    #END IF
    
    #--- set up a place for them to end ---
    for munch in munchers:
        munch.join(); #end each muncher
    #END FOR munch
    for yeet in yeeters:
        yeet.join(); #end each yeeter
    #END FOR yeet
#END IF
python multiprocessing queue
1个回答
0
投票

问题是您同时为所有咀嚼者和 yeeters 发布 None 。确保咀嚼者已经咀嚼完毕(加入线程),然后为咀嚼者发布“无”并加入他们。

    for j in range(25):
        yoinker(j)
        print(f'placed j{j}')

    for j in range(threads_calc):
        qb_in.put(None)
        print('qb_in - Put a None')

    for munch in munchers:  # Move the join here.
        munch.join()

    for j in range(threads_out):
        qb_out.put(None)
        print('qb_out - Put a None')

    for yeet in yeeters:
        yeet.join()
© www.soinside.com 2019 - 2024. All rights reserved.