在 Python 中线程读取串行端口(使用 GUI)

问题描述 投票:0回答:1

我想在运行 GUI 时每当有数据要从串行端口读取时触发一个事件。

pySerial
模块显然具有实验性功能,但它没有特别详细的记录(我在API中找不到任何有用的示例)。

这个问题似乎处理相同或至少非常相似的任务,但没有提供复制它的说明或工作代码示例。

我想出了这段代码:

import tkinter as tk import serial import threading # Create GUI window window = tk.Tk() # Initialize the port myPort = serial.Serial('/dev/ttyUSB0') # Function to call whenever there is data to be read def readFunc(port): port.readline() print('Line read') # Configure threading t1 = threading.Thread(target = readFunc, args=[myPort]) t1.start() # Main loop of the window window.mainloop()
运行它确实会触发该事件,但只会触发一次。这是为什么?是否有“推荐”的方法来做到这一点,例如使用 

pySerial

 本身的功能?

或者,我也会运行该函数来读取和处理事件数据,就像使用 GUI 元素一样。如果这是更好的解决方案,该怎么做?

相关问题(未回答),可能会使这个问题重复

编辑:这是从下面的答案中得出的一个最小示例,每当将数据读取到传入数据时,都会更改标签的文本:

import tkinter as tk from serial import Serial from serial.threaded import ReaderThread, Protocol app = tk.Tk() label = tk.Label(text="A Label") label.pack() class SerialReaderProtocolRaw(Protocol): port = None def connection_made(self, transport): """Called when reader thread is started""" print("Connected, ready to receive data...") def data_received(self, data): """Called with snippets received from the serial port""" updateLabelData(data) def updateLabelData(data): data = data.decode("utf-8") label['text']=data app.update_idletasks() # Initiate serial port serial_port = Serial("/dev/ttyACM0") # Initiate ReaderThread reader = ReaderThread(serial_port, SerialReaderProtocolRaw) # Start reader reader.start() app.mainloop()
    
python multithreading tkinter serial-port
1个回答
4
投票
当您从另一个正在运行的线程更新 GUI 时,您主要关心的是线程安全。

为了实现这一点,我们可以使用 .after() 方法,该方法为任何给定的 tk 小部件执行回调。

您的请求的另一部分是使用线程串行读取器。

这可以通过使用
ReaderThread 和 Protocol 来实现。

您可以选择两种协议:

这里是工作代码示例,其中包含上面提到的两种协议,因此您可以选择适合您的一种。请记住,来自串行端口的所有数据都只是原始字节。

import tkinter as tk from serial import Serial from serial.threaded import ReaderThread, Protocol, LineReader class SerialReaderProtocolRaw(Protocol): tk_listener = None def connection_made(self, transport): """Called when reader thread is started""" if self.tk_listener is None: raise Exception("tk_listener must be set before connecting to the socket!") print("Connected, ready to receive data...") def data_received(self, data): """Called with snippets received from the serial port""" self.tk_listener.after(0, self.tk_listener.on_data, data.decode()) class SerialReaderProtocolLine(LineReader): tk_listener = None # Terminators should be b'\r\n' for windows and b'\n' for Linux/MacOS TERMINATOR = b'\r\n' def connection_made(self, transport): """Called when reader thread is started""" if self.tk_listener is None: raise Exception("tk_listener must be set before connecting to the socket!") super().connection_made(transport) print("Connected, ready to receive data...") def handle_line(self, line): """New line waiting to be processed""" # Execute our callback in tk self.tk_listener.after(0, self.tk_listener.on_data, line) class MainFrame(tk.Frame): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.listbox = tk.Listbox(self) self.listbox.pack() self.pack() def on_data(self, data): print("Called from tk Thread:", data) self.listbox.insert(tk.END, data) if __name__ == '__main__': app = tk.Tk() main_frame = MainFrame() # Set listener to our reader SerialReaderProtocolLine.tk_listener = main_frame # Initiate serial port serial_port = Serial("/dev/ttyUSB0") # Initiate ReaderThread reader = ReaderThread(serial_port, SerialReaderProtocolLine) # Start reader reader.start() app.mainloop()
    
© www.soinside.com 2019 - 2024. All rights reserved.