考虑以下实现 QThread 的类:
from PyQt6.QtCore import QThread
from stream_consumer.streaming_worker import StreamingWorker
class GuiStreamerController:
def __init__(self, callback_function) -> None:
"""Initialize the class
Args:
callback_function (function): The GUI function that will process results in the GUI thread.
"""
self._thread = QThread()
self._worker = StreamingWorker(callback_function)
self._worker.moveToThread(self._thread)
self._thread.started.connect(self._worker.run)
self._is_running = False
def run(self, local_stream_message_criteria=None) -> None:
"""Start the streaming process.
Args:
local_stream_message_criteria (StreamMessageCriteria): The stream message criteria. Optional.
- Starts the Qthread (which starts the streaming process.)
- Implicitly calls the *StreamingWorker* *run* method (assigned in this class' __init__)
"""
if self._is_running:
return
if local_stream_message_criteria:
self._worker.stream_from_local(local_stream_message_criteria)
self._thread.start()
self._is_running = True
def quit(self) -> None:
"""Quit the streaming process."""
self._worker.quit()
self._thread.quit()
现在考虑这个非常简单的测试脚本:
...
class TestQCoreApplication:
def test_functionality(self):
self.app = QCoreApplication([])
self.streamer = GuiStreamerController(_handle_stream_result)
self.streamer.run(local_stream_message_criteria)
print('Streamer Started')
QTest.qWait(5000)
self.streamer.quit()
print('Streamer Quit')
self.app.quit()
# Block 1
# if __name__ == '__main__':
# test = TestQCoreApplication()
# test.test_functionality()
# Block 2
# test = TestQCoreApplication()
# test.test_functionality()
# Block 3
def test_this():
test = TestQCoreApplication()
test.test_functionality()
test_this()
这大部分都按预期运行,除了调用 self.streamer.quit() 会引发以下错误:
QThread: Destroyed while thread is still running
但是,如果我注释掉“Block 3”并启用“Block 1”或“Block 2”,它就可以正常工作。
抱歉,这不是一个完全可重现的示例,因为代码太多了。但是,我认为问题的答案源于从函数内调用 QCoreApplication,所以我希望有人可以提供一个简单的解释和解决方法。
请注意,我尝试在 QThread 上调用 wait(),但它只是无限期地挂起。如果我给它一个超时,错误仍然会在它结束时出现。
另请注意,我的最终目标是从 PyTest / pytest-qt 运行测试,但我遇到了相同的错误。我的想法是,如果我能解决这个简单的例子,我就可以在 PyTest 中解决它。
任何想法将不胜感激!
编辑
我忘了提及,除了在 QCoreApplication 下运行时从脚本根调用的 GuiStreamerController 按预期运行之外,它在 QApplication 循环内调用时也按预期运行。
作为另一个实验,我创建了以下课程:
from PyQt6.QtCore import QCoreApplication
from stream_consumer.gui_streamer_controller import GuiStreamerController
class StreamRunner:
def __init__(
self,
local_stream_message_criteria,
max_results: int = None) -> None:
self._max_results = max_results
self._local_stream_message_criteria = local_stream_message_criteria
self._app = QCoreApplication([])
self._streamer = GuiStreamerController(self._handle_stream_result)
self._result_count = 0
self._results = []
@property
def result_count(self):
"""Return the number of stream results"""
return self._result_count
@property
def results(self):
"""Return the stream results"""
return self._results
def run(self):
"""Start a stream run."""""
self._streamer.run(self._local_stream_message_criteria)
# THIS TIME exec is called!!!!
self._app.exec()
def _quit(self) -> None:
"""Quit a stream run."""
self._streamer.quit()
self._app.quit()
def _handle_stream_result(self, result):
"""Handle the stream results"""
self._results.append(result)
self._result_count += 1
if self._max_results and self._result_count >= self._max_results:
self._quit()
在第 33 行,它调用 exec(),更接近地模仿在 QApplication 下运行时发生的情况。但是,与我之前的脚本一样,如果我实例化此类,并从脚本的根调用其 run() 方法,一切都很好,但如果我从函数调用它,则会遇到同样的问题。
worker.quit()
方法旨在终止StreamingWorker
对象内的循环或进程,该对象似乎是异步管理任务,可能使用ray.io
进行多处理。然而,worker.quit()
在优雅地停止 QThread
方面的有效性似乎值得怀疑。如果 worker.run()
未返回,则线程保持活动状态,从而导致 QThread: Destroyed while thread is still running
错误。
当
QCoreApplication
在函数内而不是在脚本的根部启动时,问题似乎会加剧。这可能与 Qt
事件循环的管理和终止方式以及该循环内对象的生命周期有关。在通常包含事件循环的 QApplication
下运行不会出现相同的问题,这表明该问题与事件循环的管理或事件循环的缺失有关。
评论中的讨论指出了垃圾收集和测试对象寿命的问题。当全局声明对象时,它们会在脚本持续时间内持续存在,从而减轻即时错误。这表明在函数中声明时对象的销毁顺序(由于垃圾回收)可能会导致该问题。
使用全局变量来解决问题被认为是解决症状而不是根本问题的解决方法。真正的问题可能在于
worker.quit()
如何无法确保线程干净终止。依赖此类解决方法而不解决根本问题可能会导致以后出现更严重、更难以诊断的问题,尤其是在处理多线程和多处理的复杂性时。
还在评论中,讨论建议重新考虑
quit()
功能的设计和实现。确保 worker.quit()
和任何相关的清理方法有效地管理线程终止和资源清理至关重要,特别是在应用程序的行为可能与生产不同的测试场景中。
最初的问题似乎源于
worker.quit()
的实施方式及其无法保证 QThread
的终止。当在函数启动的 QCoreApplication
中运行时,对象生命周期和垃圾收集的细微差别会放大这个问题。
更可靠的解决方案可能涉及确保 worker.quit()
可靠地停止所有后台操作并向线程发出信号以干净地终止。这可能涉及在 StreamingWorker
内实现额外的同步机制或条件,以确保 run()
在调用 quit() 时可以正常退出。
对于测试,尤其是使用 PyTest 这样的框架,确保测试环境的设置和拆卸一致非常重要。这可能涉及重新审视如何管理
QCoreApplication
,可能使用固定装置来控制应用程序和流对象的生命周期。
最后,本次讨论强调了彻底理解和正确管理多线程、多处理和 Qt 事件循环之间交互的重要性,特别是在复杂的异步操作中。