Python 多处理队列,剩余项目的块

问题描述 投票:0回答:1

使用 Python 中的多处理库运行两个线程,将项目放入子线程的队列中并从父线程获取它们,挂在最后一个项目上。

问题是为什么会发生这种情况? 我们如何让它发挥作用?

完整的代码在这里。它适用于旧的 USB 光谱仪

https://drive.google.com/file/d/18kRFCnqO1GfAdrbgPYOFXp6LUdjy014s/view?usp=drive_link

https://drive.google.com/file/d/1Q0b0i_VLBBpKIapGReJ4wdVr1CiJHS1a/view?usp=drive_link

以下是描述问题所在的粗略摘录。

在设置中我们有:

    from multiprocessing import Process, Queue, Value

    class Gizmo:

      #blah blah blah, set up the hardware and create a multiprocessing Queue

      self.dataqueue = Queue()

    
    def startreader(self,nframes,nsets)
    
        # clear the dataqueue
        while not self.dataqueue.empty():
            try:
                self.dataqueue.get(False)
                print( 'queue got entry' )
            except Exception as e:
                print( 'queue.get', e )
                break

        print( 'creating reader thread')
        self.readerthread = Process( target = self.Reader_, args=(nframes,nsets) )
        if self.readerthread is None:
            print( 'creating reader thread failed')            
            return False

        print( 'starting reader thread')
        self.readerthread.start()

     def Reader_(self,nrames,nsets):

         #blah blah blah, get a bunch of records and then

         for n,record in enumerate(records):
             self.dataqueue.put( record )
             print( 'reader put record', n )

         return


     def savedata(self)

         while not self.dataqueue.empty():
             print( 'getting record')
             try:
                record = self.dataqueue.get_nowait()
                records.append(record )
                print( 'got record')
             except Exception as e:
                print(e)

          $blah blah blah and write it all to a disk file

当我们运行此命令时,我们会在第二个线程中看到从读取器推送到队列的四个记录。

reader put record 0
reader put record 1
reader put record 2
reader put record 3

然后,在看到阅读器退出后,我们调用 save()。 我们看到检索到的 4 条记录中的 3 条,然后它继续尝试获取第四条记录。

getting records
getting record
got record
getting record
got record
getting record
got record
getting record

再次,问题是:

为什么会挂起?

我们如何让它发挥作用?

python multiprocessing
1个回答
0
投票

这是 mp 和队列的常见问题。这是因为写入队列的进程(

Reader_
函数)在
savedata
函数从队列中检索所有项目之前完成并退出。即,在主进程检索所有项目之前,将项目放入队列的进程终止

它挂起是因为:当

Reader_
函数完成执行时,与其关联的进程终止。如果队列仍然持有项目并且进程终止,队列可能无法正确刷新所有项目。

savedata
方法中,您尝试从队列中获取记录,直到队列为空(完全为空)。如果队列的管道关闭并且写入进程退出时:`Queue.get_nowait() 将挂起,等待更多永远不会得到的数据。

在代码中添加注释,并带有文字

#part of the solution

def startreader(self, nframes, nsets):
    # clear the dataqueue
    while not self.dataqueue.empty():
        try:
            self.dataqueue.get(False)
            print('queue got entry')
        except Exception as e:
            print('queue.get', e)
            break

    print('creating reader thread')
    self.readerthread = Process(target=self.Reader_, args=(nframes, nsets))
    if self.readerthread is None:
        print('creating reader thread failed')
        return False

    print('starting reader thread')
    self.readerthread.start()

    # part of the solution : join the process
    # Wait for the reader process to complete
    self.readerthread.join() 
    

def Reader_(self, nframes, nsets):
    # blah blah blah, get a bunch of records and then
    for n, record in enumerate(records):
        self.dataqueue.put(record)
        print('reader put record', n)

    # part of the solution : Put a value to signal the end of the queue
    self.dataqueue.put(None)

def savedata(self):
    while True:
        print('getting record')
        try:
            record = self.dataqueue.get_nowait()
            
            # part of the solution : Check for sentinel
            if record is None:  
                break
            records.append(record)
            print('got record')
        except Exception as e:
            print(e)
            break

    # blah blah blah and write it all to a disk file
    


def savedata(self):
    while True:
        print('getting record')
        try:
        
            # part of the solution : Use timeout
            record = self.dataqueue.get(timeout=5) 
             
            records.append(record)
            print('got record')
        except Exception as e:
            print(e)
            break
© www.soinside.com 2019 - 2024. All rights reserved.