我遇到队列无法按预期工作的问题。这种设计的目标是让
muncher
可以根据需要花费尽可能少的时间或尽可能长的时间,但它总是能够充满工作(输入队列总是有东西,输出队列总是有空间)。我想最大化 muncher
的数据处理,同时 yoinker
和 yeeter
负责将数据移入/移出。
测试代码的要点是
yoink
一些数据(目前是凭空产生的),munch
在其上,然后yeet
将其放入以太中。 Yoinking
填充输入队列,munching
使用输入队列并转储到输出队列,yeeting
从输出队列中拉出。
理想情况下,它应该一切正常,我指望在停顿事件期间从队列中隐式等待(例如,
yoinker
填充输入队列,其线程在队列已满时停止;yeeter
的输出队列为空,当输出队列被填满时,它的线程会停止)。
这似乎并不可靠地发生,这对于可靠地使用代码来说是有问题的。
如果没有等待计时器,单线程
yoinker
调用者将无法退出所有 muncher
/yeeter
线程。通过 munching
/yeeting
段上的等待计时器,单线程 yoinker
调用者通常可以完成它。
通过
munching
/yeeting
段上的等待计时器,并行 yoinker
调用者甚至无法到达退出阶段。通过 yoinking
段以及 munching
/yeeting
段上的等待计时器,它会成功。
我无法控制
yoink
/munch
/yeet
段完成的速度(有时可能是瞬时的,可能是连续多个几乎瞬时的 yoinks
或其他),有办法吗为了使这一切在并行 yoinker
调用者的情况下变得可靠,而不需要任何等待计时器?
import numpy as np
import multiprocessing as mp
from time import sleep
#SOMETHING -> INPUT QUEUE
def yoinker(incrementer):
#--- make some data ---
data_in = np.ones((2000,2000)); #make some big data
data_in = data_in/np.sum(data_in)*incrementer; #tag it
#--- ditch the data ---
qb_in.put(data_in); #jam the data in
# sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END DEF
#INPUT QUEUE -> SOMETHING -> OUTPUT QUEUE
def muncher(qb_in, qb_out):
while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
#--- get the data out ---
data_in = qb_in.get(); #get the data out
if( data_in is None ):
break #this is how this gets to end
#END IF
#--- do something with the data ---
data_out = data_in.copy(); #so much
#--- ditch the data ---
qb_out.put(data_out); #jam the data in
# sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END WHILE
#END DEF
#OUTPUT QUEUE -> SOMETHING
def yeeter(qb_out):
while True: #these are eternal functions that continually work on a queue that has no predefined end (to them)
#--- get the data out ---
data_out = qb_out.get(); #get the data out
if( data_out is None ):
break #this is how this gets to end
#END IF
#--- save the data ---
# print('got data_out, sum is: '+str(np.round(np.sum(np.sum(data_out))))); #do some reporting
data_out = np.round(np.sum(np.sum(data_out)));
# sleep(np.random.random()); #nap time! wait between 0 and 1 seconds
#END WHILE
#END DEF
def parallel_starmap_helper(fn, args): #for multiprocess starmap with kwargs, MUST be outside of the function that calls it or it just hangs
return fn(*args)
#END DEF
def initer(_qb_in): #basically each process will call this function when it starts (b/c it's defined as an "initializer")
global qb_in; #it lets each process know "qb_in" is a global variables (outside of the scope of the code, e.g., they'll appear w/o being defined)
qb_in = _qb_in; #link em up here
#END DEF
if __name__=='__main__':
#--- settings ---
threads_in = 2; #number of threads for the input process (reads files, fills input queue with resulting data)
threads_calc = 4; #number of threads for the calc process (reads input queue's resulting data, converts, fills output queue)
threads_out = 2; #number of threads for the output process (reads output queue's converted data, saves files)
queueSize_in = 5; # how many input files to hold (if emptied, stalls conversion)
queueSize_out = 5; #how many output files to hold (if filled, stalls conversion)
#--- define queues that hold input and output datas ---
qb_in = mp.Queue(maxsize=queueSize_in); # Queue to hold input things
qb_out = mp.Queue(maxsize=queueSize_out); # Queue to hold output things
#--- build data generator parallel lists (not using queues) ---
parallel_list = []; #Prep
for j in range(0, 25):
parallel_list.append([yoinker, [j]]); # pack up all needed function calls
#END FOR j
#--- build data cruncher lists (using queues) ---
munchers = [mp.Process(target=muncher,args=(qb_in, qb_out)) for i in range(threads_calc)] # this function gets the data from the input queue, processes it, and then puts in the output queue
yeeters = [mp.Process(target=yeeter,args=(qb_out, )) for i in range(threads_out)] # this function gets data processed and does the final steps
#--- start up all processes that are NOT blocking ---
for munch in munchers:
munch.daemon = True; #say it lives for others
munch.start(); #start each muncher up
#END FOR munch
for yeet in yeeters:
yeet.daemon = True; #say it lives for others
yeet.start(); #start each yeeter up
#END FOR yeet
for j in range(0, 25):
yoinker(j); # pack up all needed function
print('placed j'+str(j))
#END FOR j
# #--- call blocking data generator ---
# with mp.Pool(processes=threads_in, initializer=initer, initargs=(qb_in,)) as executor:
# executor.starmap(parallel_starmap_helper, parallel_list); #function you want is replaced with; parallel_starmap_kwarg_helper helps starmap distribute everything right
# #END WITH
for j in range(0, threads_calc):
qb_in.put(None); #tell all muncher threads to quit (they get out of the qb_in queue)
print('qb_in - Put a None')
#END FOR j
for j in range(0, threads_out):
qb_out.put(None); #tell all yeeter threads to quit (they get out of the qb_out queue)
print('qb_out - Put a None')
#END FOR j
#--- This portion lets you know if the code has hung ---
#it does this via checking the queues. The queues should end as exactly enough `None`s have been put in to end all of the queues started, but without timers they do not always end.
#This is here to give some feedback, since calling `.join()` on the processes will just sit there silently.
FLG_theyDone = False;
while( FLG_theyDone == False ):
FLG_theyDone = True;
print('\nStarting loop')
if( qb_in.full() == True ):
print('qb_in full');
elif( qb_in.empty() == True ):
print('qb_in empty');
#END IF
if( qb_out.full() == True ):
print('qb_out full');
elif( qb_out.empty() == True ):
print('qb_out empty');
#END IF
for munch in munchers:
print('munch - '+str(munch.exitcode))
if( munch.exitcode is None ):
FLG_theyDone = False;
# try:
# qb_in.put(sentinel, block=False); #tell all muncher threads to quit
# except:
# print('qb_in full, can\'t jam more Nones');
# #END TRYING
#END IF
#END FOR munch
# print('yeeters - '+str(yeeters))
for yeet in yeeters:
print('yeet - '+str(yeet.exitcode))
if( yeet.exitcode is None ):
FLG_theyDone = False;
# try:
# qb_out.put(sentinel, block=False); #tell all muncher threads to quit
# except:
# print('qb_outn full, can\'t jam more Nones');
# #END TRYING
#END IF
#END FOR yeet
sleep(np.random.random()+2); #nap time! wait between 0 and 1 seconds
#END IF
#--- set up a place for them to end ---
for munch in munchers:
munch.join(); #end each muncher
#END FOR munch
for yeet in yeeters:
yeet.join(); #end each yeeter
#END FOR yeet
#END IF
问题是您同时为所有咀嚼者和 yeeters 发布 None 。确保咀嚼者已经咀嚼完毕(加入线程),然后为咀嚼者发布“无”并加入他们。
for j in range(25):
yoinker(j)
print(f'placed j{j}')
for j in range(threads_calc):
qb_in.put(None)
print('qb_in - Put a None')
for munch in munchers: # Move the join here.
munch.join()
for j in range(threads_out):
qb_out.put(None)
print('qb_out - Put a None')
for yeet in yeeters:
yeet.join()