我使用bluepy从配备了Python中的蓝牙低能设备的传感器获取数据。这个传感器是我自己设计的。因此,我可以访问传感器板上的处理
现在需要2秒钟才能从传感器获取数据。在传感器上,数据采集过程仅需200 ms。此延迟发生在“等待通知”(1.7秒)。我已将我的整个蓝牙通信类粘贴在下面。但是,简而言之,代码的工作原理如下:
while self.peripheral.waitForNotifications
等待通知李>因此,第2步需要时间(尽管传感器在大约200毫秒后开始发回)
有人知道是否有可能加快第二步吗
import struct
import bluepy.btle as btle
import numpy
import time
class ReadDelegate(btle.DefaultDelegate):
def __init__(self):
self.data = b''
def reset(self):
self.data = b''
def handleNotification(self, cHandle, data):
self.data = self.data + data
@property
def data_length(self):
return len(self.data)
class myHC08:
def __init__(self, verbose=False):
self.verbose = verbose
self.mac = '34:14:B5:50:22:ED'
self.write_service_id = 4
self.write_service = None
self.delegate = ReadDelegate()
self.peripheral = None
def print_output(self, message):
print('HC08: %s' % message)
def connect(self):
self.peripheral = btle.Peripheral(self.mac)
self.peripheral.withDelegate(self.delegate)
s = self.write_service_id
services = self.peripheral.getServices()
self.write_service = self.peripheral.getServiceByUUID(list(services)[s].uuid)
def wait_for_notification(self, time_out=5):
start = time.time()
while self.peripheral.waitForNotifications(1):
waiting = time.time() - start
if waiting > time_out: return False, False
waiting = time.time() - start
return True, waiting
def wait_for_data(self, min_bytes, time_out=5):
start = time.time()
while self.delegate.data_length < min_bytes:
waiting = time.time() - start
if waiting > time_out: return False, False
waiting = time.time() - start
return True, waiting
def write(self, message, min_bytes, unpack_string=False):
if self.verbose: self.print_output('Writing %s' % message)
self.delegate.reset()
c = self.write_service.getCharacteristics()[0]
c.write(bytes(message, "utf-8"))
self.verbose: print("HC08 Receiving %i bytes" % min_bytes)
if self.verbose: self.print_output('Waiting for notification')
success, duration = self.wait_for_notification()
if not success: return False
if self.verbose: self.print_output('Duration: %.10f' % duration)
if self.verbose: self.print_output('Waiting for data')
success, duration = self.wait_for_data(min_bytes=min_bytes)
if not success: return False
if self.verbose: self.print_output('Duration: %.10f' % duration)
received = self.delegate.data
if unpack_string:
received = struct.unpack(unpack_string, received)
received = numpy.array(received)
return received
def write_robust(self, message, min_bytes, unpack_string=False, max_retry=3):
for attempt in range(max_retry):
if self.verbose: self.print_output('Attempt %i out of %i' % (attempt, max_retry))
result = self.write(message, min_bytes, unpack_string)
if result is not False: return result
if not result: self.reconnect()
if not result: raise ('Error could not get data in %i attempts.' % max_retry)
def disconnect(self):
self.peripheral.disconnect()
def reconnect(self):
self.print_output('Reconnecting')
for x in range(5): self.disconnect()
self.connect()
目前没有回答
相关问题 更多 >
编程相关推荐