在Windows 11和Raspberry Pi 4上运行的使用蓝牙的Python脚本
我遇到了一个问题,一个Python脚本在Windows 11(通过VS)上运行的效果和在树莓派4上运行的效果不一样。
这个脚本是从一个命令行脚本修改而来的,目的是与Victron Energy的硬件进行交互。
多亏了@ukbaz的帮助,这个脚本在我的面包车上成功运行,并从一个通过蓝牙连接的Victron设备获取了数据。脚本中使用了一行代码:
loop.run_forever()
太好了!在这两台机器上都能完美运行。我不想让它一直运行,只需要运行5秒钟,足够读取蓝牙设备的数据并写入一个文本文件,以便以后使用。
在引入一个更改后:
the_end = time.time() + 5
while time.time() < the_end:
loop.stop()
loop.run_forever()
这个脚本在Windows上运行得很好,可以读取蓝牙设备并写入文件。但在树莓派上,脚本运行没有错误,但却无法读取蓝牙设备或写入文件。
这是完整的脚本:
from __future__ import annotations
import inspect
import json
import logging
import time
from enum import Enum
from typing import Set
import asyncio
from typing import List, Tuple
from threading import Thread
from bleak import BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from victron_ble.devices import Device, DeviceData, detect_device_type
from victron_ble.exceptions import AdvertisementKeyMissingError, UnknownDeviceError
logger = logging.getLogger(__name__)
class BaseScanner:
def __init__(self) -> None:
"""Initialize the scanner."""
self._scanner: BleakScanner = BleakScanner(
detection_callback=self._detection_callback
)
self._seen_data: Set[bytes] = set()
def _detection_callback(self, device: BLEDevice, advertisement: AdvertisementData):
# Filter for Victron devices and instant readout advertisements
data = advertisement.manufacturer_data.get(0x02E1)
if not data or not data.startswith(b"\x10") or data in self._seen_data:
return
# De-duplicate advertisements
if len(self._seen_data) > 1000:
self._seen_data = set()
self._seen_data.add(data)
self.callback(device, data)
def callback(self, device: BLEDevice, data: bytes):
raise NotImplementedError()
async def start(self):
await self._scanner.start()
async def stop(self):
await self._scanner.stop()
# An ugly hack to print a class as JSON
class DeviceDataEncoder(json.JSONEncoder):
def default(self, obj):
if issubclass(obj.__class__, DeviceData):
data = {}
for name, method in inspect.getmembers(obj, predicate=inspect.ismethod):
if name.startswith("get_"):
value = method()
if isinstance(value, Enum):
value = value.name.lower()
if value is not None:
data[name[4:]] = value
return data
class Scanner(BaseScanner):
def __init__(self, device_keys: dict[str, str] = {}):
super().__init__()
self._device_keys = {k.lower(): v for k, v in device_keys.items()}
self._known_devices: dict[str, Device] = {}
async def start(self):
logger.info(f"Reading data for {list(self._device_keys.keys())}")
await super().start()
def get_device(self, ble_device: BLEDevice, raw_data: bytes) -> Device:
address = ble_device.address.lower()
if address not in self._known_devices:
advertisement_key = self.load_key(address)
device_klass = detect_device_type(raw_data)
if not device_klass:
raise UnknownDeviceError(
f"Could not identify device type for {ble_device}"
)
self._known_devices[address] = device_klass(advertisement_key)
return self._known_devices[address]
def load_key(self, address: str) -> str:
try:
return self._device_keys[address]
except KeyError:
raise AdvertisementKeyMissingError(f"No key available for {address}")
def callback(self, ble_device: BLEDevice, raw_data: bytes):
logger.debug(
f"Received data from {ble_device.address.lower()}: {raw_data.hex()}"
)
try:
device = self.get_device(ble_device, raw_data)
except AdvertisementKeyMissingError:
return
except UnknownDeviceError as e:
logger.error(e)
return
parsed = device.parse(raw_data)
blob = {
"name": ble_device.name,
"address": ble_device.address,
"rssi": ble_device.rssi,
"payload": parsed,
}
print(json.dumps(blob, cls=DeviceDataEncoder, indent=1))
ve_string = json.dumps(blob, cls=DeviceDataEncoder, indent=1)
print(ve_string)
#MAC_filename = "this_device" + ".txt"
#print(f"MAC filename: {MAC_filename}")
this_file = open("this_device.txt", "w")
this_file.write(ve_string)
this_file.close()
print("file written")
time.sleep(3)
class DiscoveryScanner(BaseScanner):
def __init__(self) -> None:
super().__init__()
self._seen_devices: Set[str] = set()
def callback(self, device: BLEDevice, advertisement: bytes):
if device.address not in self._seen_devices:
logger.info(f"{device}")
self._seen_devices.add(device.address)
class DebugScanner(BaseScanner):
def __init__(self, address: str):
super().__init__()
self.address = address
async def start(self):
logger.info(f"Dumping advertisements from {self.address}")
await super().start()
def callback(self, device: BLEDevice, data: bytes):
if device.address.lower() == self.address.lower():
logger.info(f"{time.time():<24}: {data.hex()}")
def my_scan(device_keys: List[Tuple[str, str]]):
loop = asyncio.get_event_loop()
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
the_end = time.time() + 5
while time.time() < the_end:
loop.stop()
loop.run_forever()
if __name__ == '__main__':
my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")])
我尝试从
run_forever
改为
run_until_complete
这个方法在树莓派上完全失败了 :( 所以我又改回来了。
为什么这个脚本在两台机器上表现得不一样?我该如何在树莓派上实现相同的效果?非常感谢任何帮助。
1 个回答
1
我猜您想做的事情是等文件写完后再退出。
也许您可以使用 asyncio 的事件功能?
https://docs.python.org/3/library/asyncio-sync.html#event
在文件顶部的导入语句后面,您可以在全局范围内创建一个事件,代码如下:
file_written_event = asyncio.Event()
然后在文件写入完成后,添加 file_written_event.set()
。比如:
this_file = open("this_device.txt", "w")
this_file.write(ve_string)
this_file.close()
print("file written")
file_written_event.set()
您文件的底部会有比较大的变化。它的功能基本相同,但会有一个循环在等待事件发生后再退出。
async def my_scan(device_keys: List[Tuple[str, str]]):
async def scan(keys):
scanner = Scanner(keys)
await scanner.start()
asyncio.ensure_future(scan({k: v for k, v in device_keys}))
while not file_written_event.is_set():
await asyncio.sleep(0.1)
if __name__ == '__main__':
logging.basicConfig()
logger.setLevel(logging.DEBUG)
asyncio.run(my_scan([("d5:55:aa:4d:99:33","149c3c2865054b71962dcb06866524a9")]))