我正在开发一个Python多处理应用程序,其中一个进程(生产)将项目放入multiprocessing.Queue中,另一个进程(监听)应该将它们取出。队列有最大大小。当队列已满时,product 会尝试从队列中删除一项,然后放入新一项。但是,我遇到一个问题,即使从队列中删除项目后,尝试放入新项目仍然会导致完全异常。
使用
JoinableQueue
并致电 task_done
尚未解决此问题。
这是我的代码的简化版本:
import multiprocessing as mp
from queue import Full, Empty
def listen(queue: mp.Queue) -> None:
while True:
try:
queue.get_nowait()
except Empty:
pass
def produce(queue: mp.Queue) -> None:
item = "qwerty"
while True:
try:
queue.put_nowait(item)
except Full:
try:
queue.get_nowait()
except Empty:
pass
try:
queue.put_nowait(item)
except Full:
print("Full")
def run():
queue = mp.Queue(maxsize=10)
p1 = mp.Process(target=produce, args=(queue,))
p2 = mp.Process(target=listen, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == "__main__":
run()
看来您有潜在的竞争状况。我们首先要了解队列是如何实现的:
multiprocessing.Queue
实例构建在 multiprocessing.Pipe
及其 multiprocessing.connection.Connection
实例之上。即使您创建一个大小不受限制的队列,底层管道也肯定会限制通过一个连接发送的数据量,而无需通过其他连接同时读取数据。因此,当对队列执行 put
时,该项目仅附加到内部 collections.dequeue
。它是一个内部队列线程,已启动等待将项目放置在双端队列上,该线程实际上从出队中弹出项目并将其发送到连接。通过这种方式,发出原始 put
的线程永远不会阻塞(假设队列具有无限大小),因为一旦该项目被附加到出队,“放置”线程就不再需要执行任何操作。但是,如果没有人通过其他连接读取数据,则从出列中获取这些项目并将其发送到管道连接的内部队列线程将会阻塞。
因此,为了能够从队列中获取项目,必须发生两件事:步骤 1 是将项目附加到内部双端队列,步骤 2 是线程从出队中获取项目并将其发送到连接。当队列被初始化为有限大小时,一旦出列有那么多项目,队列就会显得已满,但是在从队列中获取项目之前,必须给队列的内部线程一个运行的机会,并且弹出双端队列的一项。如果调用
get_nowait
时没有发生这种情况,则队列仍将满。
在您的
produce
代码中,您将发出 put_nowait
调用,直到获得“完整”条件。但是,这样放入的所有项目很可能仍然位于出队中,因为从出队读取的线程可能尚未调度。因此,当您执行 get_nowait
调用来检索一个项目以便允许您放置另一个项目时,可能无法通过接收连接接收到数据,因此双端队列仍处于其最大容量。因此,当您尝试发送新消息时,您会发现队列仍然已满。在我们尝试从队列中检索项目之前,当发现已满条件时,让正在处理来自出队的消息的线程有机会执行其操作,方法是插入睡眠调用。请阅读我添加到以下代码中的注释:
put_nowait
假设休眠 0.1 秒足以让队列的内部线程有机会处理其双端队列,以上代码现在将按预期运行。