连接到Wolkabout物联网平台的网关通信模块的sdk
wolk-gateway-module的Python项目详细描述
wolkgatewaymodule sdk python
python 3包,用于通过WolkGateway将设备连接到wolkabout物联网平台。
此软件包用于开发Wolkgateway模块,使无IP连接的设备能够将其数据发送到Wolkabout IoT平台。
用户负责提供通常包含设备的网络通信协议的自定义实现,并提供业务逻辑和与所用硬件相关的所有内容以及它们特定用例的细节。
但是,通过wolkconnect-bus处理程序指向网关的所有通信都已随此包提供,这是一个用python 3.7编写的开源实现,它使用基于tcp/ip的mqtt协议与WolkGateway通信。
要求
- Python3.7
通过调用:
sudo apt-get install python3.7 python3-pip && python3 -m pip install pip && python3.7 -m pip install pip
安装
可以使用python的包管理器pip:
sudo python3.7 -m pip install wolk-gateway-module
或者通过克隆存储库并运行:
sudo python3.7 -m pip install -r requirements.txtpython3.7 setup.py install
示例用法
创建设备
importwolk_gateway_moduleaswolk# Create device sensors# Use data_type parameter where reading type & unit symbol are not importantgeneric_sensor=wolk.SensorTemplate(name="Generic sensor",reference="G",# References must be unique per devicedata_type=wolk.DataType.NUMERIC,description="Optional description",minimum=0,# Optional minimum valuemaximum=100,# Optional maximum value)temperature_sensor=wolk.SensorTemplate(name="Temperature",reference="T",reading_type_name=wolk.ReadingTypeName.TEMPERATURE,unit=wolk.ReadingTypeMeasurementUnit.CELSIUS,minimum=-20,maximum=85,description="Temperature sensor with range -20 to 85 Celsius",)# Create a device template used to register the devicedevice_template=wolk.DeviceTemplate(sensors=[generic_sensor,temperature_sensor])# Create a devicedevice=wolk.Device(name="Device",key="DEVICE_KEY",# Unique device keytemplate=device_template)
与Wolkgateway建立连接
# Implement a device status providerdefget_device_status(device_key:str)->wolk.DeviceStatus:"""Return current device status."""ifdevice_key=="DEVICE_KEY":# Handle getting current device status herereturnwolk.DeviceStatus.CONNECTEDwolk_module=wolk.Wolk(host="localhost",# Host address of WolkGatewayport=1883,# TCP/IP port used for WolkGateway's MQTT brokermodule_name="Python module",# Used for connection authenticationdevice_status_provider=get_device_status,)wolk_module.connect()
与Wolkgateway断开连接
wolk_module.disconnect()
添加设备
设备需要在其数据被视为有效之前在平台上注册。 这可以通过调用:
wolk_module.add_device(device)
要停止侦听特定设备的命令,请使用:
wolk_module.remove_device(device)
这只会停止确认入站命令,要完全删除设备,请使用wolkgateway或web应用程序,具体取决于谁控制设备。
发布设备状态
通过调用提供的device_status_provider
函数获得设备状态
wolk_module.publish_device_status("DEVICE_KEY")
增加传感器读数
wolk_module.add_sensor_reading("DEVICE_KEY","REFERENCE","value")# For reading with data size > 1, like location or acceleration use tupleswolk_module.add_sensor_reading("DEVICE_KEY","LOC",(24.534,-34.325))# Add timestamps to store when reading occurred to preserve history, otherwise# Platform will assign timestamp when it receives the readingwolk_module.add_sensor_reading("KEY","R",12,int(round(time.time()*1000)))
此方法将把序列化的消息放入存储。
发布存储的消息
wolk_module.publish()# Publish all stored messageswolk_module.publish("DEVICE_KEY")# Publish all stored messages for device
警报
humidity_alarm=wolk.AlarmTemplate(name="High Humidity",reference="HH",description="High humidity has been detected")device_template=wolk.DeviceTemplate(alarms=[humidity_alarm])# Create device, Wolk instance, add device, connect...# Will place alarm message into storage, use publish method to sendwolk_module.add_alarm("DEVICE_KEY","HH",active=True,timestamp=None)
执行器
为了控制设备执行器,提供actuation_handler
和actuator_status_provider
。
switch_actuator=wolk.ActuatorTemplate(name="Switch",reference="SW",data_type=wolk.DataType.BOOLEAN,description="Light switch",)slider_actuator=wolk.ActuatorTemplate(name="Slider",reference="SL",data_type=wolk.DataType.NUMERIC,minimum=0,maximum=100,description="Light dimmer",)device_template=wolk.DeviceTemplate(actuators=[switch_actuator,slider_actuator])device=wolk.Device("Device","DEVICE_KEY",device_template)defhandle_actuation(device_key:str,reference:str,value:Union[bool,int,float,str])->None:""" Set device actuator identified by reference to value. Must be implemented as non blocking. Must be implemented as thread safe. """ifdevice_key=="DEVICE_KEY":ifreference=="SW":# Handle setting the value hereswitch.value=valueelifreference=="SL":slider.value=valuedefget_actuator_status(device_key:str,reference:str)->Tuple[wolk.ActuatorState,Union[bool,int,float,str]]:""" Get current actuator status identified by device key and reference. Reads the status of actuator from the device and returns it as a tuple containing the actuator state and current value. Must be implemented as non blocking. Must be implemented as thread safe. """ifdevice_key=="DEVICE_KEY":ifreference=="SW":# Handle getting current actuator value herereturnwolk.ActuatorState.READY,switch.valueelifreference=="SL":returnwolk.ActuatorState.READY,slider.value# Pass functions to Wolk instancewolk_module=wolk.Wolk(host="localhost",port=1883,module_name="Python module",device_status_provider=get_device_status,actuation_handler=handle_actuation,actuator_status_provider=get_actuator_status,)wolk_module.add_device(device)wolk_module.connect()# This method will call the provided actuator_status_provider function# and publish the state immediately or store message if unable to publishwolk_module.publish_actuator_status("DEVICE_KEY","SW")wolk_module.publish_actuator_status("DEVICE_KEY","SL")
配置
与执行器类似,使用设备配置选项需要为configuration_handler
实例提供configuration_provider
和Wolk
。
logging_level_configuration=wolk.ConfigurationTemplate(name="Logging level",reference="LL",data_type=wolk.DataType.STRING,default_value="INFO",description="eg. Set device logging level",)logging_interval_configuration=wolk.ConfigurationTemplate(name="Logging interval",reference="LI",data_type=wolk.DataType.NUMERIC,size=3,labels=["seconds","minutes","hours"],description="eg. Set logging intervals",)device_template=wolk.DeviceTemplate(configurations=[logging_level_configuration,logging_level_configuration])device=wolk.Device("Device","DEVICE_KEY",device_template)defget_configuration(device_key:str)->Dict[str,Union[int,float,bool,str,Tuple[int,int],Tuple[int,int,int],Tuple[float,float],Tuple[float,float,float],Tuple[str,str],Tuple[str,str,str],],]:""" Get current configuration options. Reads device configuration and returns it as a dictionary with device configuration reference as key, and device configuration value as value. Must be implemented as non blocking. Must be implemented as thread safe. """ifdevice_key=="DEVICE_KEY":# Handle getting configuration values herereturn{"LL":get_log_level(),"LI":get_log_inteval(),}defhandle_configuration(device_key:str,configuration:Dict[str,Union[int,float,bool,str,Tuple[int,int],Tuple[int,int,int],Tuple[float,float],Tuple[float,float,float],Tuple[str,str],Tuple[str,str,str],],],)->None:""" Change device's configuration options. Must be implemented as non blocking. Must be implemented as thread safe. """ifdevice_key=="DEVICE_KEY":forreference,valueinconfiguration.items():# Handle setting configuration values hereifreference=="LL":set_log_level(value)elifreference=="LI":set_log_interval(value)# Pass functions to Wolk instancewolk_module=wolk.Wolk(host="localhost",port=1883,module_name="Python module",device_status_provider=get_device_status,configuration_provider=get_configuration,configuration_handler=handle_configuration,)wolk_module.add_device(device)wolk_module.connect()# This method will call the provided configuration_provider function# and publish the state immediately or store message if unable to publishwolk_module.publish_configuration("DEVICE_KEY")
固件更新
为了启用设备的固件更新,提供FirmwareHandler
的实现并传递给Wolk
实例。
device_template=wolk.DeviceTemplate(supports_firmware_update=True)device=wolk.Device("Device","DEVICE_KEY",device_template)classFirmwareHandlerImplementation(wolk.FirmwareHandler):"""Handle firmware installation and abort commands, and report version. Once an object of this class is passed to a Wolk object, it will set callback methods `on_install_success` and `on_install_fail` used for reporting the result of the firmware update process. Use these callbacks in `install_firmware` and `abort_installation` methods."""definstall_firmware(self,device_key:str,firmware_file_path:str)->None:""" Handle the installation of the firmware file. Call `self.on_install_success(device_key)` to report success. Reporting success will also get new firmware version. If installation fails, call `self.on_install_fail(device_key, status)` where: `status = FirmwareUpdateStatus( FirmwareUpdateState.ERROR, FirmwareUpdateErrorCode.INSTALLATION_FAILED )` or use other values from `FirmwareUpdateErrorCode` if they fit better. """ifdevice_key=="DEVICE_KEY":print(f"Installing firmware: '{firmware_file_path}' "f"on device '{device_key}'")# Handle the actual installation hereifinstall_success:self.on_install_success(device_key)else:status=wolk.FirmwareUpdateStatus(wolk.FirmwareUpdateState.ERROR,wolk.FirmwareUpdateErrorCode.INSTALLATION_FAILED,)self.on_install_fail(device_key,status)defabort_installation(self,device_key:str)->None:""" Attempt to abort the firmware installation process for device. Call `self.on_install_fail(device_key, status)` to report if the installation process was able to be aborted with `status = FirmwareUpdateStatus(FirmwareUpdateState.ABORTED)` If unable to stop the installation process, no action is required. """ifdevice_key=="DEVICE_KEY":# Manage to stop firmware installationstatus=wolk.FirmwareUpdateStatus(wolk.FirmwareUpdateState.ABORTED)self.on_install_fail(device_key,status)defget_firmware_version(self,device_key:str)->str:"""Return device's current firmware version."""ifdevice_key=="DEVICE_KEY":# Handle getting the current firmware version herereturnversionwolk_module=wolk.Wolk(host="localhost",port=1883,module_name="Python module",device_status_provider=get_device_status,firmware_handler=FirmwareHandlerImplementation(),)wolk_module.add_device(device)wolk_module.connect()
调试
启用调试日志记录:
wolk.logging_config("debug",log_file=None)
数据持久性
默认情况下,数据持久性机制使用存储在内存中的消息。
如果在内存中提供的持久性是次优的,则可以通过实现OutboundMessageQueue
并以以下方式传递它来使用自定义持久性:
wolk_module=wolk.Wolk(host="localhost",port=1883,module_name="Python module",device_status_provider=get_device_status,outbound_message_queue=CustomPersistence())