串行实时绘图仪 python

问题描述 投票:0回答:1

下面的代码绘制了从串行端口接收到的数字。然而,

locals()[self.axes_mapping[axis]]
的值并不是函数
receive_data
接收到的所有数据。似乎有些价值观丢失了。如果我尝试打印 x 的值,它们与接收到的数据相对应。如何确保绘制所有收到的数据?

import sys
from PyQt6.QtWidgets import QApplication, QMainWindow
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore
import numpy as np
from collections import deque
import threading
import queue
import serial

BUFFER_SIZE = 100

class RealTimePlot(QMainWindow):
    def __init__(self):
        super().__init__()

        self.setup_ui()
    
        self.serial_port = serial.Serial('COM6', 115200)
        self.N = 100
        self.fs = 1000  # Sampling frequency in Hz (adjust according to your setup)
        self.T = 1/self.fs
        self.x_values = np.arange(0, self.N*self.T, self.T)

        # Circular buffers for time domain plots
        self.axes = ['X', 'RMS']

        self.z_values = {axis: deque([0] * self.N, maxlen=self.N) for axis in self.axes}
        self.z_index = {axis: 0 for axis in self.axes}

        # Axes variable mapping
        self.axes_mapping = {'X': 'x', 'RMS': 'rms'}

        # Plotting setup
        self.setup_plots()

        self.data_queue = queue.Queue()
        # Lock for synchronizing access to the data queue
        self.data_queue_lock = threading.Lock()

        # Create and start the receiving thread
        self.receive_thread = threading.Thread(target=self.receive_data)
        self.receive_thread.daemon = True
        self.receive_thread.start()

        # Start the animation
        self.timer = QtCore.QTimer(self)
        self.timer.timeout.connect(self.update_plot)
        self.timer.start(10)

    def setup_ui(self):
        self.central_widget = pg.GraphicsLayoutWidget()
        self.setCentralWidget(self.central_widget)

    def setup_plots(self):
        self.plots = {axis: self.central_widget.addPlot(row=i, col=0, title=f"<span style='color: #ffffff; font-weight: bold; font-size: 15px'>Time Domain - {axis} Axis</span>")
                      for i, axis in enumerate(self.axes)}

        for plot in self.plots.values():
            plot.setLabel('bottom', 'Time', 's')
            # plot.setLabel('left', 'Amplitude', 'g')
            plot.setYRange(-2, 5000)


        linha1 = pg.mkPen((52, 255, 52), width=2) # R G B & width
        # linha4 = pg.mkPen((255, 255, 255), width=2) 
        self.lines = {axis: plot.plot(pen=linha1) for axis, plot in self.plots.items()}
        # self.lines_fft = {axis: plot.plot(pen=linha4) for axis, plot in self.plots_fft.items()}
        self.plots['RMS'].setYRange(0, 5000)

    def receive_data(self):
        data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
        data_cnt = 0
        rx_flag = True
        while True:
            if self.serial_port.in_waiting > 0.0:  # Check if there is data waiting
                    data_str =  int.from_bytes(self.serial_port.read(2), byteorder='little', signed = False)
                    rx_flag = True

            with self.data_queue_lock:
                if rx_flag == True:
                    data_buffer[data_cnt] = data_str
                    data_cnt = data_cnt+1
                    rx_flag = False

                # Check if the buffer size is reached, then update the plot
                if data_cnt >= BUFFER_SIZE:
                    self.data_queue.put(data_buffer)
                    data_buffer = np.zeros((BUFFER_SIZE), dtype=int)
                    data_cnt = 0
                    

    def calculate_rms(self, x):
        return np.sqrt(np.mean(np.square([x])))

    def update_plot(self):
        with self.data_queue_lock:
            while not self.data_queue.empty():
                data_buffer = self.data_queue.get()

                for data_str in data_buffer:
                    x = data_str
                    rms = self.calculate_rms(data_buffer)

                for axis in self.axes:
                    self.z_values[axis].append(locals()[self.axes_mapping[axis]])
                    self.lines[axis].setData(self.x_values, self.z_values[axis])
                    print(locals()[self.axes_mapping[axis]])
        return self.lines.values()
    

def closeEvent(self, event):
    self.csv_file.close()
    event.accept()

def main():
    app = QApplication(sys.argv)
    window = RealTimePlot()
    window.show()
    sys.exit(app.exec())

if __name__ == '__main__':
    main()

示例来自这里

https://www.reddit.com/r/embedded/comments/18h9smt/attempted_switch_from_dpc_blue_screen_error_while/

python multithreading collections deque pyqtgraph
1个回答
0
投票

这段代码

for data_str in data_buffer:
    x = data_str  # x = one item, drops the rest
    rms = self.calculate_rms(data_buffer)

for axis in self.axes:
    self.z_values[axis].append(locals()[self.axes_mapping[axis]])
    self.lines[axis].setData(self.x_values, self.z_values[axis])

应该是

x = data_buffer  # x = all items
rms = [self.calculate_rms(data_buffer)]

for axis in self.axes:
    self.z_values[axis].extend(locals()[self.axes_mapping[axis]])
    self.lines[axis].setData(self.x_values, self.z_values[axis])

两个轴也应该有不同的时间尺度,因为第二个轴是第一个轴中 100 个点的平均值。

© www.soinside.com 2019 - 2024. All rights reserved.