下面的代码绘制了从串行端口接收到的数字。然而,
locals()[self.axes_mapping[axis]]
的值并不是函数receive_data
接收到的所有数据。似乎有些价值观丢失了。如果我尝试打印 x 的值,它们与接收到的数据相对应。如何确保绘制所有收到的数据?
import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial
BUFFER_SIZE = 100
class RealTimePlot(QMainWindow):
def __init__(self):
super().__init__()
self.setup_ui()
self.serial_port = serial.Serial('COM6', 115200)
self.N = 100
self.fs = 1000 # Sampling frequency in Hz (adjust according to your setup)
self.T = 1/self.fs
self.x_values = np.arange(0, self.N*self.T, self.T)
# Circular buffers for time domain plots
self.axes = ['X', 'RMS']
self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
self.z_index = {axis: 0 for axis in self.axes}
# Axes variable mapping
self.axes_mapping = {'X': 'x', 'RMS': 'rms'}
# Plotting setup
self.setup_plots()
self.data_queue = queue.Queue()
# Lock for synchronizing access to the data queue
self.data_queue_lock = threading.Lock()
# Create and start the receiving thread
self.receive_thread = threading.Thread(target=self.receive_data)
self.receive_thread.daemon = True
self.receive_thread.start()
# Start the animation
self.timer = QtCore.QTimer(self)
self.timer.timeout.connect(self.update_plot)
self.timer.start(10)
def setup_ui(self):
self.central_widget = pg.GraphicsLayoutWidget()
self.setCentralWidget(self.central_widget)
def setup_plots(self):
self.plots = {axis: self.central_widget.addPlot(row=i, col=0, title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
for i, axis in enumerate(self.axes)}
for plot in self.plots.values():
plot.setLabel('bottom', 'Time', 's')
# plot.setLabel('left', 'Amplitude', 'g')
plot.setYRange(-2, 5000)
linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
# linha4 = pg.mkPen((255, 255, 255), width=2)
self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
# self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
self.plots['RMS'].setYRange(0, 5000)
def receive_data(self):
data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
data_cnt = 0
rx_flag = True
while True:
if self.serial_port.in_waiting > 0.0: # Check if there is data waiting
data_str = int.from_bytes(self.serial_port.read(2), byteorder='little', signed = False)
rx_flag = True
with self.data_queue_lock:
if rx_flag == True:
data_buffer[data_cnt] = data_str
data_cnt = data_cnt+1
rx_flag = False
# Check if the buffer size is reached, then update the plot
if data_cnt >= BUFFER_SIZE:
self.data_queue.put(data_buffer)
data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
data_cnt = 0
def calculate_rms(self, x):
return np.sqrt(np.mean(np.square([x])))
def update_plot(self):
with self.data_queue_lock:
while not self.data_queue.empty():
data_buffer = self.data_queue.get()
for data_str in data_buffer:
x = data_str
rms = self.calculate_rms(data_buffer)
for axis in self.axes:
self.z_values[axis].append(locals()[self.axes_mapping[axis]])
self.lines[axis].setData(self.x_values, self.z_values[axis])
print(locals()[self.axes_mapping[axis]])
return self.lines.values()
def closeEvent(self, event):
self.csv_file.close()
event.accept()
def main():
app = QApplication(sys.argv)
window = RealTimePlot()
window.show()
sys.exit(app.exec())
if __name__ == '__main__':
main()
示例来自这里
这段代码
for data_str in data_buffer:
x = data_str # x = one item, drops the rest
rms = self.calculate_rms(data_buffer)
for axis in self.axes:
self.z_values[axis].append(locals()[self.axes_mapping[axis]])
self.lines[axis].setData(self.x_values, self.z_values[axis])
应该是
x = data_buffer # x = all items
rms = [self.calculate_rms(data_buffer)]
for axis in self.axes:
self.z_values[axis].extend(locals()[self.axes_mapping[axis]])
self.lines[axis].setData(self.x_values, self.z_values[axis])
两个轴也应该有不同的时间尺度,因为第二个轴是第一个轴中 100 个点的平均值。