当在其他地方实例化另一个对象时,如何从另一个对象接收PyQt信号?

问题描述 投票:0回答:1

我正在构建一个系统,以通过Bluetooth LE将基于Arduino的传感器连接到RPi,并在GUI上显示信息(温度和电池寿命)。我的程序中有两个主要的类,一个管理GUI,一个管理BLE连接(class HubSensor)。一个HubSensor对象获取每个传感器的MAC地址,并应该发出带有附加元组的信号,该元组包含感测到的温度,电池寿命和索引整数,以使主程序知道它是哪个传感器。 HubSensor每秒获取一次它的信息,并且应该每次发送一次信号。 (已经建立了输入验证,但与我的问题无关。)到目前为止,大多数验证工作正常。

我的问题是我不知道如何创建一个插槽来接收信号,以便它可以更新显示(并在CSV文件中保留日志)。我正在使用BluePy库来管理BLE连接,对我来说,这是另外的挑战。

所以,这就是我的程序的工作方式(我认为)。每个线程(因为我有多个传感器)都会创建一个HubSensor对象。当对象建立BLE连接时,它会创建一个MyDelegate对象(从BluePy的DefaultDelegate继承。在MyDelegate对象内部,发出Qt信号。我需要在所有这些类之外访问该信号)由于我不知道所创建的MyDelegate对象的名称,因此我不知道如何使用它。

我已经尝试让上述每个类都继承彼此的特征,但是我不确定我做对了吗。

来自trailerTempDisplay.py

import sys
from PyQt5.QtCore import *
from HubSensor import *
from PyQt5 import QtWidgets, uic
from bluepy.btle import *

from datetime import datetime


# mac addresses for the sensors.  later, this will need a function to allow new devices to connect
bt_addrs = ['c1:49:02:59:ae:50', 'f3:ad:ed:46:ea:16']

app = QtWidgets.QApplication(sys.argv)


class Worker(QRunnable):
    def __init__(self, macAddress, ind):
        super(Worker, self).__init__()
        self.macAddress = macAddress
        self.ind = ind

    @pyqtSlot()
    #this is where each sensor exists.  each object is created and runs here
    def run(self):
        self.sensor = HubSensor(self.macAddress, self.ind)
        self.sensor.notified.connect(self.updateValues())

#close button
def buttonClicked():
    app.closeAllWindows()

window = uic.loadUi("mainwindow.ui")
window.pushButton.clicked.connect(buttonClicked)


def updateValues(self):
    print("value updated")       # debugging


window.show()
window.threadpool = QThreadPool()
index = 0
for addr in bt_addrs:
    worker = Worker(addr, index)
    index += 1
    window.threadpool.start(worker)
app.exec()

来自HubSensor.py

from bluepy.btle import *
from PyQt5.QtCore import QObject, pyqtSignal


class MyDelegate(DefaultDelegate, QObject):
    def __init__(self, index):
        DefaultDelegate.__init__(self)
        QObject.__init__(self)
        self.index = index

    # class variable for the notified signal
    notified = pyqtSignal(tuple)

    def handleNotification(self, cHandle, data):
        # exception handling prevents bad data from being passed.  cHandle is not used but might be useful later
        try:

            # defining the sensorData tuple.  1, 2, and 3 are dummy values
            self.sensorData = (1, 2, 3)
            self.notified.emit(self.sensorData)  # this should emit a signal every time the function is called and send a tuple with temp, battery, and sensor index(id)

        except (ValueError, IndexError):
            pass


class HubSensor(MyDelegate):
    # constructor.  connects to device defined by mac address and position.
    # uuid is static and should not change

    def __init__(self, mac, index):
        self.index = index  # keeps track of sensor position
        self.mac = mac
        self.p = Peripheral(self.mac, 'random')  # adafruit feathers must be 'random' for some reason
        self.p.setDelegate(MyDelegate(self.index))
        self.p.writeCharacteristic(35, b'\x01\x00')  # writing these bits to handle '35' enables notifications
        while True:
            if self.p.waitForNotifications(1):
                # when a bluetooth notification is received, handleNotification is called
                continue

当我运行程序时,控制台上未显示“值已更新”。它应该每秒大约弹出两次,然后重复一次。稍后,我将添加将值传递到GUI显示的部分。

让我提前道歉,因为我仍然还是一个初学者。我认为我已经将代码的所有相关部分包括在内,但我不确定。另外,我很确定我的术语在某些地方是不正确的,因此希望大家都能理解我的实际意思。预先感谢您能为我提供的任何帮助!

python pyqt bluetooth-lowenergy
1个回答
0
投票

您的代码令人困惑,例如HubSensor是其属性为Peripheral的委托,而Peripheral具有另一个委托,以及其他问题。

因此,我创建了PeripheralManager类,而不是依靠您的代码,该类将通过信号通知分配的外围设备接收的信息。对于无限循环,它将使用threading.Thread在线程中处理。

import threading

from PyQt5 import QtCore, QtWidgets, uic
from bluepy.btle import DefaultDelegate, Peripheral, BTLEDisconnectError


class PeripheralManager(QtCore.QObject, DefaultDelegate):
    notified = QtCore.pyqtSignal(bytes)

    def __init__(self, peripheral, parent=None):
        super().__init__(parent)
        self._peripheral = peripheral
        self.peripheral.setDelegate(self)
        self.peripheral.writeCharacteristic(35, b"\x01\x00")
        threading.Thread(target=self._manage_notifications, daemon=True).start()

    @property
    def peripheral(self):
        return self._peripheral

    def handleNotification(self, cHandle, data):
        self.notified.emit(self.data)

    def _manage_notifications(self):
        while self.peripheral.waitForNotifications(1):
            continue


def buttonClicked():
    QtWidgets.QApplication.closeAllWindows()


def updateValues(values):
    print("value updated", values)


def main(args):

    app = QtWidgets.QApplication(args)

    bt_addrs = ["c1:49:02:59:ae:50", "f3:ad:ed:46:ea:16"]

    managers = []

    for addr in bt_addrs:
        try:
            p = Peripheral(addr, "random")
        except BTLEDisconnectError as e:
            print(e)
        else:
            manager = PeripheralManager(p)
            manager.notified.connect(updateValues)
            managers.append(manager)

    window = uic.loadUi("mainwindow.ui")
    window.pushButton.clicked.connect(buttonClicked)
    window.show()

    ret = app.exec_()

    return ret


if __name__ == "__main__":
    import sys

    sys.exit(main(sys.argv))
© www.soinside.com 2019 - 2024. All rights reserved.