在python中使用BLUE和bluepy加速等待通知

2024-04-24 03:29:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用bluepy从配备了Python中的蓝牙低能设备的传感器获取数据。这个传感器是我自己设计的。因此,我可以访问传感器板上的处理

现在需要2秒钟才能从传感器获取数据。在传感器上,数据采集过程仅需200 ms。此延迟发生在“等待通知”(1.7秒)。我已将我的整个蓝牙通信类粘贴在下面。但是,简而言之,代码的工作原理如下:

  1. 通过蓝牙向传感器写入一个字符串(传感器解析该字符串以查看其应返回的数据类型)
  2. 使用while self.peripheral.waitForNotifications等待通知
  3. 读取数据(这是快速的<;<;1毫秒)

因此,第2步需要时间(尽管传感器在大约200毫秒后开始发回)

有人知道是否有可能加快第二步吗

import struct
import bluepy.btle as btle
import numpy
import time


class ReadDelegate(btle.DefaultDelegate):
    def __init__(self):
        self.data = b''

    def reset(self):
        self.data = b''

    def handleNotification(self, cHandle, data):
        self.data = self.data + data

    @property
    def data_length(self):
        return len(self.data)


class myHC08:
    def __init__(self, verbose=False):
        self.verbose = verbose
        self.mac = '34:14:B5:50:22:ED'
        self.write_service_id = 4
        self.write_service = None
        self.delegate = ReadDelegate()
        self.peripheral = None

    def print_output(self, message):
        print('HC08: %s' % message)

    def connect(self):
        self.peripheral = btle.Peripheral(self.mac)
        self.peripheral.withDelegate(self.delegate)
        s = self.write_service_id
        services = self.peripheral.getServices()
        self.write_service = self.peripheral.getServiceByUUID(list(services)[s].uuid)

    def wait_for_notification(self, time_out=5):
        start = time.time()
        while self.peripheral.waitForNotifications(1):
            waiting = time.time() - start
            if waiting > time_out: return False, False
        waiting = time.time() - start
        return True, waiting

    def wait_for_data(self, min_bytes, time_out=5):
        start = time.time()
        while self.delegate.data_length < min_bytes:
            waiting = time.time() - start
            if waiting > time_out: return False, False
        waiting = time.time() - start
        return True, waiting

    def write(self, message, min_bytes, unpack_string=False):
        if self.verbose: self.print_output('Writing %s' % message)
        self.delegate.reset()
        c = self.write_service.getCharacteristics()[0]
        c.write(bytes(message, "utf-8"))
        self.verbose: print("HC08 Receiving %i bytes" % min_bytes)

        if self.verbose: self.print_output('Waiting for notification')
        success, duration = self.wait_for_notification()
        if not success: return False
        if self.verbose: self.print_output('Duration: %.10f' % duration)

        if self.verbose: self.print_output('Waiting for data')
        success, duration = self.wait_for_data(min_bytes=min_bytes)
        if not success: return False
        if self.verbose: self.print_output('Duration: %.10f' % duration)

        received = self.delegate.data
        if unpack_string:
            received = struct.unpack(unpack_string, received)
            received = numpy.array(received)
        return received

    def write_robust(self, message, min_bytes, unpack_string=False, max_retry=3):
        for attempt in range(max_retry):
            if self.verbose: self.print_output('Attempt %i out of %i' % (attempt, max_retry))
            result = self.write(message, min_bytes, unpack_string)
            if result is not False: return result
            if not result: self.reconnect()
        if not result: raise ('Error could not get data in %i attempts.' % max_retry)

    def disconnect(self):
        self.peripheral.disconnect()

    def reconnect(self):
        self.print_output('Reconnecting')
        for x in range(5): self.disconnect()
        self.connect()


Tags: selffalseforoutputverbosedatareturnif