在 QThread 中创建大量带有信号的 QObject

问题描述 投票:0回答:2

我正在摆弄一个具有保存/加载功能的应用程序。读取保存的文件和加载所有组件都需要足够的时间来暂停程序一段时间。我发现虽然我可以读取保存的文件,但我应该避免在线程内创建任何QObjects(对象需要信号,因此QObject)。为了解决这个问题,我发出保存文件的内容并在外部处理创建对象,但由于对象的创建需要一些时间,所以我不准备放弃这个想法。

我重新创建了一个最小的工作示例,它显示了问题。无论哪种方式(正常方式或线程方式)都可以正确创建对象,但是不会发出信号(或发出到现已被垃圾收集的线程中的 void 中)。我在我的应用程序中使用(d)这种模式,除了我从线程发出我需要的东西并发送到东西,而不是在线程内做东西。

如果这确实是一个坏主意并且违背了良好的编程实践,请告诉我,我将不得不调整或忍受一个口吃的 GUI。我热衷于避免这些口吃的原因是因为我在繁重的操作中使用旋转加载器-gif并且它不断运行它我在一个单独的线程中执行繁重的工作(想一想,也许gif可以在它的自己的线程代替...?)

from PySide6 import QtCore, QtWidgets


class SomeParentContainer(QtCore.QObject):
    some_signal = QtCore.Signal(str)
    
    def __init__(self):
        self.container = list()
        super().__init__()


class SomeObjectWithSignals(QtCore.QObject):
    some_signal = QtCore.Signal(str)
    
    def __init__(self, parent, dummy_id):
        self.parent = parent
        self.dummy_id = dummy_id
        super().__init__()
        self.some_signal.connect(lambda dummy_id: self.parent.some_signal.emit(dummy_id))

    def fake_emit(self):
        self.some_signal.emit(self.dummy_id)
        
        
container_obj = SomeParentContainer()     
        
        
class SomeMainWindow(QtWidgets.QMainWindow):
    def __init__(self):
        super().__init__()
        container_obj.some_signal.connect(lambda dummy_id: print(f"some_signal emitted from child: {dummy_id}"))
        
        self.main_widget = QtWidgets.QWidget()
        self.setCentralWidget(self.main_widget)
        self.layout = QtWidgets.QVBoxLayout()
        
        btn = QtWidgets.QPushButton()
        btn.clicked.connect(self.btn1)
        btn.setText("Add normally")
        self.layout.addWidget(btn)
        
        btn = QtWidgets.QPushButton()
        btn.clicked.connect(self.btn2)
        btn.setText("Add threaded")
        self.layout.addWidget(btn)
        
        btn = QtWidgets.QPushButton()
        btn.clicked.connect(self.btn3)
        btn.setText("Emit signals")
        self.layout.addWidget(btn)
        
        self.main_widget.setLayout(self.layout)
        
    def btn1(self):
        container_obj.container.append(SomeObjectWithSignals(container_obj, "test1"))
        
    def btn2(self):
        class SomeThreadedAction(QtCore.QThread):
            finished = QtCore.Signal()
            
            def run(self):
                container_obj.container.append(SomeObjectWithSignals(container_obj, "test2"))
        self.thread = SomeThreadedAction()
        self.thread.finished.connect(lambda: print("thread finished"))
        self.thread.start()
        
    def btn3(self):
        for objects in container_obj.container:
            print("Should be emitting something next:")
            objects.fake_emit()
            print("-------------------------")


app = QtWidgets.QApplication()
form = SomeMainWindow()
form.show()
app.exec()
python multithreading qt pyqt pyside6
2个回答
0
投票

我不建议在线程之间共享

QObject
实例,因为这会让你的代码变得不必要的复杂。如果将纯 Python 对象的结果从工作线程传输到主线程,并在主线程中从结果创建
QObject
实例,那就简单多了。

但是如果你坚持要转移

QObject
实例,你就得关心这几点了。

  1. 您应该更改

    QObject
    实例的线程关联性。请参阅每线程事件循环

  2. 当您拨打

    connect()
    时,您应该在信号接收器上给出正确的提示。如果您不指定
    QObject
    实例的槽,则会创建一个与调用线程有关联的虚拟接收器。

  3. 您应该保持

    QObject
    实例处于活动状态,直到接收线程拥有它为止。

所以,你可以像下面这样做。

...
class SomeParentContainer(QtCore.QObject):
    ...
    def emit_some_signal(self, dummy_id):
        self.some_signal.emit(dummy_id)

class SomeObjectWithSignals(QtCore.QObject):
    def __init__(self, parent, dummy_id):
        ...
        #self.some_signal.connect(lambda dummy_id: self.parent.some_signal.emit(dummy_id))
        self.moveToThread(parent.thread()) # The point #1.
        self.some_signal.connect(parent.emit_some_signal) # The point #2.
        self.setParent(parent) # The point #3.
...
class SomeMainWindow(QtWidgets.QMainWindow):
    ...
    def btn2(self):
        class SomeThreadedAction(QtCore.QThread):
            #finished = QtCore.Signal() # It's bad to override the QThread.finished.
            obj_created = QtCore.Signal(QtCore.QObject)
            def run(self):
                #container_obj.container.append(SomeObjectWithSignals(container_obj, "test2"))
                # It's bad to update the state of an object
                # owned by the main thread in a worker thread.
                self.obj_created.emit(SomeObjectWithSignals(container_obj, "test2"))
        self.thread = SomeThreadedAction()
        self.thread.finished.connect(lambda: print("thread finished"))
        self.thread.obj_created.connect(container_obj.container.append)
        self.thread.start()
...

0
投票

经过数小时的摆弄并尝试使其与 QtCore.Signal 和线程一起工作后,我决定实现自己的非 Qt 信号。

这可能会给我带来更多问题,但我必须完成这个项目。

如果有人找到这篇文章并想自己尝试,这就是我的结果:

class QueuedSignal:
    num_queues = 0
    _queue = []

    def __init__(self, *args):
        self.args = args
        self.blocked = False
        self._slots = []

    def connect(self, slot):
        if type(slot) is QueuedSignal:
            slot = slot.emit
        if callable(slot):
            self._slots.append(slot)
        else:
            raise ValueError("Slot must be callable")

    def emit(self, *args):
        if self.blocked:
            return
        self.check_args(*args)
        for slot in self._slots:
            if self.num_queues > 0:
                self.add_to_queue(slot, args)
            else:
                self.fire(slot, args)

    def check_args(self, *args):
        if len(args) != len(self.args):
            raise TypeError(f"QueuedSignal expected {len(self.args)} signal(s) but {len(args)} were given.")

        types = tuple(type(arg) for arg in args)
        for type_, expected in zip(types, self.args):
            if not issubclass(type_, expected):
                raise TypeError(f"QueuedSignal expected {self.args} but got {types}")

    def queued(self, is_queued):
        if is_queued:
            QueuedSignal.num_queues = QueuedSignal.num_queues + 1
        else:
            QueuedSignal.num_queues = max(0, QueuedSignal.num_queues - 1)

        if QueuedSignal.num_queues == 0:
            self.empty_queue()

    def block_signals(self, is_blocked):
        self.blocked = is_blocked

    @classmethod
    def queued(cls, is_queued):
        if is_queued:
            cls.num_queues = cls.num_queues + 1
        else:
            cls.num_queues = max(0, cls.num_queues - 1)

        if cls.num_queues == 0:
            cls.empty_queue()

    @classmethod
    def add_to_queue(cls, slot, args):
        cls._queue.append((slot, args))

    @classmethod
    def empty_queue(cls):
        queue = []
        for signal in cls._queue:
            if signal not in queue:
                queue.append(signal)
        for slot, args in queue:
            cls.fire(slot, args)
        cls._queue.clear()

    @staticmethod
    def fire(slot, args):
        signature = inspect.signature(slot)
        parameters = signature.parameters
        num_parameters = len(parameters)
        if num_parameters == 0 or any(parameter.kind != parameter.VAR_POSITIONAL for parameter in parameters.values()):
            args = args[:num_parameters]
        slot(*args)
© www.soinside.com 2019 - 2024. All rights reserved.