此链接解释了 QThread 可以通过以下两种方式之一使用:
run()
并执行自定义代码,但无法接收事件。嗯,我想要两者兼得的东西。我有一个
SerialThread
会定期检查串行端口并在可用时接收数据,但主线程可能还需要发送一些数据,并且由于两个 serial
对象不能使用相同的物理串行端口,因此主线程必须通过SerialThread
发送数据。
我已经有了一个实现这种双重功能的基本系统。
class SerialThread(QThread):
class Worker(QObject):
def __init__(self, n: int):
super().__init__()
self.n = n
@pyqtSlot()
def work(self):
print(f"Exec from worker {self.n}...")
print("Work Thread ID:", format(threading.get_ident(), '04X'))
sig_new_data = pyqtSignal(list)
_sig_work = pyqtSignal()
_worker = Worker(1)
_serial = serial.Serial()
def __init__(self):
super().__init__()
self._worker.moveToThread(self)
self._sig_work.connect(self._worker.work)
def run(self):
print("Serial Thread ID:", format(threading.get_ident(), '04X'))
self.__open_com_port()
while True:
self.eventDispatcher().processEvents(QEventLoop.AllEvents) # executes pending events
if not self._serial.is_open and not self.__open_com_port():
self.msleep(2000)
continue
if self.__get_data():
self.sig_new_data.emit()
def queue_work(self): # called from the main thread
self._sig_work.emit() # Adds a pending event to the SerialThread event loop.
问题是,我希望可能有多种类型的工作人员来做不同的事情,或者有一个也接受可调用的
Worker
类。这意味着每次请求某些工作时我都必须创建一个新的 Worker
实例并将 _sig_work 连接到它,发出信号,然后像这样断开它:
def queue_work(thread: SerialThread):
worker = SerialThread.Worker()
worker.moveToThread(thread)
thread._sig_work.connect(worker.work)
thread._sig_work.emit()
thread._sig_work.disconnect(worker.work)
但是,这似乎仅在
Worker
实例在类内部创建或作为全局变量创建时才起作用(例如,当在函数或方法中实例化时它不起作用)。也许这与垃圾收集器有关。
另外,在未来,多个线程可能想要访问串行端口,这意味着我不能使用一个信号(因为它可能被两个线程同时访问)。但
pyqtSignal
的问题在于它必须位于 QObject
的子类内,而我不能只在 queue_work()
中创建新信号。
即使我设法让这些工作正常工作,这似乎也不是一个非常干净的方法。什么是更简单/更好的方法来执行此操作或根本不执行此操作(以某种方式解决多个
serial
对象问题)?
嗯,我按照 musicamante 的建议使用了队列。这个想法曾经闪过我的脑海,但我希望 PyQt 已经有一个解决方案。
class SerialThread(QThread):
class _Work:
def __init__(self, work: ()):
super().__init__()
self.work = work
def run(self):
self.work()
sig_new_data = pyqtSignal(list)
_serial = serial.Serial()
_queue = queue.Queue()
def run(self):
print("Serial Thread ID:", format(threading.get_ident(), '04X'))
self.__open_com_port()
while True:
while not self._quque.empty():
work = self._queue.get_nowait()
cast(SerialThread._Work, work).run()
if not self._serial.is_open and not self.__open_com_port():
self.msleep(2000)
continue
if self.__get_data():
self.sig_new_data.emit()
def queue_work(self): # called from the main thread
work = SerialThread._Work(lambda: do something)
self._queue.put_nowait(work)