Python 中进程终止后的意外行为

问题描述 投票:0回答:1

我有一个多处理程序,旨在在第一个进程完成时终止所有其他进程,使用 multiprocessing.Lock() 来指示该进程的结束,并使用 multiprocessing.Queue() 来存储返回值。

但是,有时程序会卡在queue.get()方法中,就像put()函数还没有完成一样。

这是该问题的简化版本,只有一个子流程。


import multiprocessing

def foo(is_done, queue):
    queue.put(1)
    is_done.set()

    
if __name__ == "__main__":
    while True:
        is_done = multiprocessing.Event()
        queue = multiprocessing.Queue()
        p = multiprocessing.Process(target=foo,args=(is_done,queue))
        p.start()
        is_done.wait()
        p.terminate()
        print(queue.get())
        

由于锁定是在将值放入队列之后设置的,所以我预计进程会在添加值后终止。我注意到终止前的一点延迟解决了问题(也许是因为 put() 不是中间终止?)我很想知道为什么会这样。

python locking python-multiprocessing terminate
1个回答
0
投票

你需要明白两个关键点。

首先是

multiprocessing.Queue
内部是使用Pipe实现的,也就是说它是一个队列协议下进程间通信的类。 可以说,
put
方法是“排队”操作,该项目实际上并不“在队列中”,而是保留在发送者的进程中,直到发生进程间通信,即直到有人尝试获取该项目。物品。 这就是为什么当队列不为空时我们无法加入子进程。 (注意:当物品较小时,上述描述可能不正确。)

第二个是

Process.terminate
是一种强制终止进程的方法,而不是优雅地关闭它。

文档明确指出:

如果在关联进程正在使用管道或队列时使用此方法,则管道或队列可能会损坏并且可能无法被其他进程使用。

这是因为尚未“在队列中”的项目可能会丢失,以及仍在保留它们的进程。

因此,无法保证您的代码将按预期工作。即使看起来像,也只是巧合。

解决方法如下:

import multiprocessing


def foo(is_done, queue):
    queue.put(1)
    is_done.set()


if __name__ == "__main__":
    while True:
        is_done = multiprocessing.Event()
        queue = multiprocessing.Queue()
        p = multiprocessing.Process(target=foo, args=(is_done, queue))
        p.start()
        is_done.wait()
        # p.terminate()  # <-- Do not terminate BEFORE the queue is empty.
        print(queue.get())
        p.join()  # You can safely join the process AFTER the queue is empty.
        p.close()

还有一点值得一提。修改后的代码仍然包含潜在的错误。

这种行为非常令人困惑,但是

put
方法也有可能在“排队”之前退出。 即,可以在队列仍为空时设置事件。

这成为一个问题,尤其是在放置大型项目并执行非阻塞时

get

import multiprocessing

def foo(is_done, queue):
    queue.put(list(range(1_000_000)))  # Something big item.
    is_done.set()

if __name__ == "__main__":
    while True:
        is_done = multiprocessing.Event()
        queue = multiprocessing.Queue()
        p = multiprocessing.Process(target=foo, args=(is_done, queue))
        p.start()
        is_done.wait()
        _ = queue.get(block=False)  # Non-blocking get.
        print("Success!")
        p.join()
        p.close()

当您运行此命令时,您最终会观察到

Empty
异常。这可能是由于“排队”操作稍有延迟,因为它是多线程的。

请注意,由于您的代码使用了阻塞

get
,这对您来说应该不是问题,因为它会等到项目排队为止。

如果修改后的代码仍然不能按预期工作,请考虑使用

multiprocessing.Manager().Queue()
代替。
Manager.Queue
有自己的流程,确保项目一放入就“在队列中”。 这将使行为在许多方面更加可预测。 但是,它的速度明显较慢,因此哪个更好取决于您的用例。

© www.soinside.com 2019 - 2024. All rights reserved.