当另一个对象在其他地方实例化时,如何从该对象接收PyQt信号?

2024-09-28 20:48:20 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在构建一个系统,通过蓝牙LE将基于Arduino的传感器连接到RPi,并在GUI上显示信息(温度和电池寿命)。我的程序中有两个主要类,一个管理GUI,另一个管理BLE连接(class HubSensor)。一个HubSensor对象获取每个传感器的MAC地址,并且应该发出一个附加元组的信号,该元组包含感测到的温度、电池寿命和一个索引整数,以让主程序知道它是哪个传感器。HubSensor每秒获取一次信息,每次都应该发送信号。(已经建立了输入验证,但它与我的问题无关。)到目前为止,大部分工作正常。你知道吗

我的问题是我不知道如何创建一个插槽来接收信号,这样它就可以更新显示(然后在CSV文件中保留一个日志)。我正在使用BluePy库来管理BLE连接,这对我来说是一个额外的挑战。你知道吗

所以,我的程序就是这样工作的(我想)。每个线程(因为我有多个传感器)创建一个HubSensor对象。当对象建立BLE连接时,它会创建一个MyDelegate对象(从BluePy的DefaultDelegate子类化)。在MyDelegate对象内部,Qt信号被发射。我需要访问所有这些类之外的信号,因为我不知道创建的MyDelegate对象的名称,所以我不知道如何访问它。你知道吗

我试过让上面提到的每一个类都继承彼此的特性,但我不确定我做得是否正确。你知道吗

从trailerTempDisplay.py文件你知道吗

import sys
from PyQt5.QtCore import *
from HubSensor import *
from PyQt5 import QtWidgets, uic
from bluepy.btle import *

from datetime import datetime


# mac addresses for the sensors.  later, this will need a function to allow new devices to connect
bt_addrs = ['c1:49:02:59:ae:50', 'f3:ad:ed:46:ea:16']

app = QtWidgets.QApplication(sys.argv)


class Worker(QRunnable):
    def __init__(self, macAddress, ind):
        super(Worker, self).__init__()
        self.macAddress = macAddress
        self.ind = ind

    @pyqtSlot()
    #this is where each sensor exists.  each object is created and runs here
    def run(self):
        self.sensor = HubSensor(self.macAddress, self.ind)
        self.sensor.notified.connect(self.updateValues())

#close button
def buttonClicked():
    app.closeAllWindows()

window = uic.loadUi("mainwindow.ui")
window.pushButton.clicked.connect(buttonClicked)


def updateValues(self):
    print("value updated")       # debugging


window.show()
window.threadpool = QThreadPool()
index = 0
for addr in bt_addrs:
    worker = Worker(addr, index)
    index += 1
    window.threadpool.start(worker)
app.exec()

从轮毂传感器.py你知道吗

from bluepy.btle import *
from PyQt5.QtCore import QObject, pyqtSignal


class MyDelegate(DefaultDelegate, QObject):
    def __init__(self, index):
        DefaultDelegate.__init__(self)
        QObject.__init__(self)
        self.index = index

    # class variable for the notified signal
    notified = pyqtSignal(tuple)

    def handleNotification(self, cHandle, data):
        # exception handling prevents bad data from being passed.  cHandle is not used but might be useful later
        try:

            # defining the sensorData tuple.  1, 2, and 3 are dummy values
            self.sensorData = (1, 2, 3)
            self.notified.emit(self.sensorData)  # this should emit a signal every time the function is called and send a tuple with temp, battery, and sensor index(id)

        except (ValueError, IndexError):
            pass


class HubSensor(MyDelegate):
    # constructor.  connects to device defined by mac address and position.
    # uuid is static and should not change

    def __init__(self, mac, index):
        self.index = index  # keeps track of sensor position
        self.mac = mac
        self.p = Peripheral(self.mac, 'random')  # adafruit feathers must be 'random' for some reason
        self.p.setDelegate(MyDelegate(self.index))
        self.p.writeCharacteristic(35, b'\x01\x00')  # writing these bits to handle '35' enables notifications
        while True:
            if self.p.waitForNotifications(1):
                # when a bluetooth notification is received, handleNotification is called
                continue

当我运行程序时,“值更新”不会显示在控制台上。它应该每秒弹出两次,然后重复。稍后,我将添加将值传递到GUI显示的部分。你知道吗

让我提前道歉,因为我还是个初学者。我想我已经包括了我的代码的所有相关部分,但我不确定。另外,我很肯定我在某些地方的术语是不正确的,所以我希望你们都能理解我真正的意思。提前谢谢你能给我的任何帮助!你知道吗


Tags: and对象fromimportselfindex信号init
1条回答
网友
1楼 · 发布于 2024-09-28 20:48:20

您的代码很混乱,例如HubSensor是一个委托,其属性是一个外设,而该外设有另一个委托,以及其他问题。你知道吗

因此,我没有依赖于您的代码,而是创建了peripheraldemanager类,它将通过信号通知指定的外围设备接收到的信息。在无限循环的情况下,它将在线程中使用穿线。穿线. 你知道吗

import threading

from PyQt5 import QtCore, QtWidgets, uic
from bluepy.btle import DefaultDelegate, Peripheral, BTLEDisconnectError


class PeripheralManager(QtCore.QObject, DefaultDelegate):
    notified = QtCore.pyqtSignal(bytes)

    def __init__(self, peripheral, parent=None):
        super().__init__(parent)
        self._peripheral = peripheral
        self.peripheral.setDelegate(self)
        self.peripheral.writeCharacteristic(35, b"\x01\x00")
        threading.Thread(target=self._manage_notifications, daemon=True).start()

    @property
    def peripheral(self):
        return self._peripheral

    def handleNotification(self, cHandle, data):
        self.notified.emit(self.data)

    def _manage_notifications(self):
        while self.peripheral.waitForNotifications(1):
            continue


def buttonClicked():
    QtWidgets.QApplication.closeAllWindows()


def updateValues(values):
    print("value updated", values)


def main(args):

    app = QtWidgets.QApplication(args)

    bt_addrs = ["c1:49:02:59:ae:50", "f3:ad:ed:46:ea:16"]

    managers = []

    for addr in bt_addrs:
        try:
            p = Peripheral(addr, "random")
        except BTLEDisconnectError as e:
            print(e)
        else:
            manager = PeripheralManager(p)
            manager.notified.connect(updateValues)
            managers.append(manager)

    window = uic.loadUi("mainwindow.ui")
    window.pushButton.clicked.connect(buttonClicked)
    window.show()

    ret = app.exec_()

    return ret


if __name__ == "__main__":
    import sys

    sys.exit(main(sys.argv))

相关问题 更多 >