使用 Python 中的多处理库运行两个线程,将项目放入子线程的队列中并从父线程获取它们,挂在最后一个项目上。
问题是为什么会发生这种情况? 我们如何让它发挥作用?
完整的代码在这里。它适用于旧的 USB 光谱仪
https://drive.google.com/file/d/18kRFCnqO1GfAdrbgPYOFXp6LUdjy014s/view?usp=drive_link
https://drive.google.com/file/d/1Q0b0i_VLBBpKIapGReJ4wdVr1CiJHS1a/view?usp=drive_link
以下是描述问题所在的粗略摘录。
在设置中我们有:
from multiprocessing import Process, Queue, Value
class Gizmo:
#blah blah blah, set up the hardware and create a multiprocessing Queue
self.dataqueue = Queue()
def startreader(self,nframes,nsets)
# clear the dataqueue
while not self.dataqueue.empty():
try:
self.dataqueue.get(False)
print( 'queue got entry' )
except Exception as e:
print( 'queue.get', e )
break
print( 'creating reader thread')
self.readerthread = Process( target = self.Reader_, args=(nframes,nsets) )
if self.readerthread is None:
print( 'creating reader thread failed')
return False
print( 'starting reader thread')
self.readerthread.start()
def Reader_(self,nrames,nsets):
#blah blah blah, get a bunch of records and then
for n,record in enumerate(records):
self.dataqueue.put( record )
print( 'reader put record', n )
return
def savedata(self)
while not self.dataqueue.empty():
print( 'getting record')
try:
record = self.dataqueue.get_nowait()
records.append(record )
print( 'got record')
except Exception as e:
print(e)
$blah blah blah and write it all to a disk file
当我们运行此命令时,我们会在第二个线程中看到从读取器推送到队列的四个记录。
reader put record 0
reader put record 1
reader put record 2
reader put record 3
然后,在看到阅读器退出后,我们调用 save()。 我们看到检索到的 4 条记录中的 3 条,然后它继续尝试获取第四条记录。
getting records
getting record
got record
getting record
got record
getting record
got record
getting record
再次,问题是:
为什么会挂起?
我们如何让它发挥作用?
这是 mp 和队列的常见问题。这是因为写入队列的进程(
Reader_
函数)在 savedata
函数从队列中检索所有项目之前完成并退出。即,在主进程检索所有项目之前,将项目放入队列的进程终止
它挂起是因为:当
Reader_
函数完成执行时,与其关联的进程终止。如果队列仍然持有项目并且进程终止,队列可能无法正确刷新所有项目。
在
savedata
方法中,您尝试从队列中获取记录,直到队列为空(完全为空)。如果队列的管道关闭并且写入进程退出时:`Queue.get_nowait() 将挂起,等待更多永远不会得到的数据。
在代码中添加注释,并带有文字
#part of the solution
def startreader(self, nframes, nsets):
# clear the dataqueue
while not self.dataqueue.empty():
try:
self.dataqueue.get(False)
print('queue got entry')
except Exception as e:
print('queue.get', e)
break
print('creating reader thread')
self.readerthread = Process(target=self.Reader_, args=(nframes, nsets))
if self.readerthread is None:
print('creating reader thread failed')
return False
print('starting reader thread')
self.readerthread.start()
# part of the solution : join the process
# Wait for the reader process to complete
self.readerthread.join()
def Reader_(self, nframes, nsets):
# blah blah blah, get a bunch of records and then
for n, record in enumerate(records):
self.dataqueue.put(record)
print('reader put record', n)
# part of the solution : Put a value to signal the end of the queue
self.dataqueue.put(None)
def savedata(self):
while True:
print('getting record')
try:
record = self.dataqueue.get_nowait()
# part of the solution : Check for sentinel
if record is None:
break
records.append(record)
print('got record')
except Exception as e:
print(e)
break
# blah blah blah and write it all to a disk file
def savedata(self):
while True:
print('getting record')
try:
# part of the solution : Use timeout
record = self.dataqueue.get(timeout=5)
records.append(record)
print('got record')
except Exception as e:
print(e)
break