我在 PyQt6 中遇到线程问题。我在How to stop a QThread from the GUI and rebuild it with a separate dialog and a finished event.
的解决方案中使用了很好的例子。理论上,我要创建的是一个连接对话框,它打开与 oracledb 的数据库连接。虽然,如果我在连接对话框中单击连接,它不应该静默连接,但它应该显示一个取消对话框,如果我愿意,我可以选择取消当前的连接尝试。 这个功能稍后应该用于我程序中对数据库的每个查询,连接对话框只是一个例子。
我当前的问题是取消对话框上的按钮未显示。即使我使用线程让工作人员连接,取消对话框也会以某种方式被冻结。
为此,我创建了以下简化代码示例:
import time
from PyQt6.QtCore import *
from PyQt6.QtGui import *
from PyQt6.QtWidgets import *
class ConnectionDialog(QDialog):
def __init__(self):
super().__init__()
self.database = -1
self.setFixedSize(QSize(200, 100))
# create CancelDialog but do not display yet
self.cancel_dialog = CancelDialog(self)
# creat a thread
self.thread = QThread()
self.thread.start()
# creat a worker and put it on thread
self.worker = Worker()
self.worker.moveToThread(self.thread)
# create button which triggers the cancel_dialog and therefore the connection attempt
self.btnStart = QPushButton("Connect")
self.btnStart.clicked.connect(self.start_worker)
# if the worker emits finished call helper function end_thread
self.worker.finished.connect(self.end_thread)
# create dummy layout to display the button
self.layout = QHBoxLayout()
self.layout.addWidget(self.btnStart)
self.setLayout(self.layout)
# helper function to quit and wait for the thread
def end_thread(self, connection):
self.worker.stop()
self.thread.quit()
self.thread.wait()
print(f"Connected to {connection}")
self.close()
# push connection to self.database for further use
self.database = connection
# helper function to start worker in thread
def start_worker(self):
self.cancel_dialog.show()
self.worker.task()
class CancelDialog(QDialog):
def __init__(self, parent):
super().__init__(parent)
self.setModal(True)
# create button for the cancel operation for the thread of the parent
self.btnStop = QPushButton("Cancel")
self.btnStop.clicked.connect(lambda: self.cancel_thread())
# create dummy layout to display the button
self.layout = QHBoxLayout()
self.layout.addWidget(self.btnStop)
self.setLayout(self.layout)
# helper function when pressing cancel button
def cancel_thread(self):
# stop worker
self.parent().worker.stop()
# quit thread, this time we dont want to wait for the thread
self.parent().thread.quit()
print("Canceled")
self.close()
# push error value to self.database
self.parent().database = -1
class Worker(QObject):
"Object managing the simulation"
finished = pyqtSignal(int)
def __init__(self):
super().__init__()
self._isRunning = True
self.connection = -1
def task(self):
if not self._isRunning:
self._isRunning = True
# this simulates a database connection
print("connecting...")
print("fake delay before connection")
time.sleep(3)
print("really connecting now")
self.connection = 123
print("fake delay after connection")
time.sleep(3)
print("really connecting now")
self.finished.emit(self.connection)
print("finished connecting")
def stop(self):
self._isRunning = False
if self.connection:
self.connection = 0
print("Canceled")
if __name__ == "__main__":
app = QApplication(sys.argv)
simul = ConnectionDialog()
simul.show()
sys.exit(app.exec())