我已经使用BLE GATT服务器从raspberry pi广播了传感器值。但我无法理解我是如何在本地pc或笔记本电脑上接收这些数据的?

2024-04-25 07:29:43 发布

您现在位置:Python中文网/ 问答频道 /正文

这是我的GATT BLE工作服务器代码。它广播超声波传感器数据。我已经用“nRF工具箱应用程序”进行了测试。在该应用程序中,显示距离完美。但我想在本地pc或笔记本电脑上显示价值。我怎样才能做到这一点。请帮忙

import sys
import time
import RPi.GPIO as gpio
import dbus
import dbus.mainloop.glib
from gi.repository import GLib
from example_advertisement import Advertisement
from example_advertisement import register_ad_cb, register_ad_error_cb
from example_gatt_server import Service, Characteristic
from example_gatt_server import register_app_cb, register_app_error_cb

BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_INTERFACE = 'org.freedesktop.DBus.ObjectManager'
LE_ADVERTISING_MANAGER_INTERFACE = 'org.bluez.LEAdvertisingManager1'
GATT_MANAGER_INTERFACE = 'org.bluez.GattManager1'
GATT_CHARACTERISTIC_INTERFACE = 'org.bluez.GattCharacteristic1'
UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
UART_RX_CHARACTERISTIC_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
UART_TX_CHARACTERISTIC_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
LOCAL_NAME = 'rpi-gatt-server'
mainloop = None


class TxCharacteristic(Characteristic):
    def __init__(self, bus, index, service):
        Characteristic.__init__(self, bus, index, UART_TX_CHARACTERISTIC_UUID,
                                ['notify'], service)
        self.notifying = False
        GLib.io_add_watch(sys.stdout, GLib.IO_OUT, self.on_console_input)

    def find_distance(self):
        trigpin = 2
        echopin = 3

        gpio.setmode(gpio.BCM)
        gpio.setup(trigpin, gpio.OUT)
        gpio.setup(echopin, gpio.IN)

        while True:
            gpio.output(trigpin, gpio.LOW)
            time.sleep(2)
            gpio.output(trigpin, gpio.HIGH)
            time.sleep(0.00001)
            gpio.output(trigpin, gpio.LOW)
            while gpio.input(echopin) == 0:
                pass

            start_time = time.time()

            while gpio.input(echopin) == 1:
                pass

            stop_time = time.time()

            duration = stop_time - start_time
            distance = round((duration * 17150), 2)
            # print("the distance is", distance, "cm")
            gpio.cleanup()
            return f"{distance}{'':<3}cm"

    def on_console_input(self,f,condition):
        data = self.find_distance()
        self.send_tx(data)
        # print("the distance is", data, "cm")
        time.sleep(3)
        return True

    def send_tx(self, s):
        if not self.notifying:
            return
        value = []
        for c in s:
            value.append(dbus.Byte(c.encode()))
        self.PropertiesChanged(GATT_CHARACTERISTIC_INTERFACE, {'Value': value}, [])

    def StartNotify(self):
        if self.notifying:
            return
        self.notifying = True

    def StopNotify(self):
        if not self.notifying:
            return
        self.notifying = False


class RxCharacteristic(Characteristic):
    def __init__(self, bus, index, service):
        Characteristic.__init__(self, bus, index, UART_RX_CHARACTERISTIC_UUID,
                                ['write'], service)

    def WriteValue(self, value, options):
        print('remote: {}'.format(bytearray(value).decode()))


class UartService(Service):
    def __init__(self, bus, index):
        Service.__init__(self, bus, index, UART_SERVICE_UUID, True)
        self.add_characteristic(TxCharacteristic(bus, 0, self))
        self.add_characteristic(RxCharacteristic(bus, 1, self))


class Application(dbus.service.Object):
    def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_INTERFACE, out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        for service in self.services:
            response[service.get_path()] = service.get_properties()
            characteristics = service.get_characteristics()
            for chrc in characteristics:
                response[chrc.get_path()] = chrc.get_properties()
        return response


class UartApplication(Application):
    def __init__(self, bus):
        Application.__init__(self, bus)
        self.add_service(UartService(bus, 0))


class UartAdvertisement(Advertisement):
    def __init__(self, bus, index):
        Advertisement.__init__(self, bus, index, 'peripheral')
        self.add_service_uuid(UART_SERVICE_UUID)
        self.add_local_name(LOCAL_NAME)
        self.include_tx_power = True


def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_INTERFACE)
    objects = remote_om.GetManagedObjects()
    for o, props in objects.items():
        if LE_ADVERTISING_MANAGER_INTERFACE in props and GATT_MANAGER_INTERFACE in props:
            return o
        print('Skip adapter:', o)
    return None


def main():
    global mainloop
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    adapter = find_adapter(bus)
    if not adapter:
        print('BLE adapter not found')
        return

    service_manager = dbus.Interface(
        bus.get_object(BLUEZ_SERVICE_NAME, adapter),
        GATT_MANAGER_INTERFACE)
    ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
                                LE_ADVERTISING_MANAGER_INTERFACE)

    app = UartApplication(bus)
    adv = UartAdvertisement(bus, 0)

    mainloop = GLib.MainLoop()

    service_manager.RegisterApplication(app.get_path(), {},
                                        reply_handler=register_app_cb,
                                        error_handler=register_app_error_cb)
    ad_manager.RegisterAdvertisement(adv.get_path(), {},
                                     reply_handler=register_ad_cb,
                                     error_handler=register_ad_error_cb)
    try:
        mainloop.run()
    except KeyboardInterrupt:
        adv.Release()


if __name__ == '__main__':
    main()

现在我如何在本地pc或笔记本电脑上接收这些数据。这就是这个代码


Tags: importselfregistergetindexgpioreturntime