用于python的enos mqtt sdk
enos-mqtt-sdk-python的Python项目详细描述
使用enos device sdk for mqtt for python(预览版)
此repo包含用于mqtt for python的enos设备sdk预览版。本文将指导如何准备开发环境,以便使用enos device sdk for mqtt for python。
Installing Python
To use the EnOS Device SDK for MQTT for Python, you will need Python 2.7.13+ or 3.5.3+, and ^{
Obtaining EnOS Device SDK for MQTT for Python
You can obtain the SDK through the following methods:
- Install from pip
- Download the source code by cloning this repo and build on your machine
Installing from PIP
Use the following command to install EnOS Device SDK for MQTT for Python from PIP.
^{pr 1}$Building from Source Code
Obtain the EnOS Device SDK for MQTT for Python source code.
- From GitHub:
- From EnOS SDK Center. Click SDK Center from the left navigation of EnOS Console, and obtain the SDK source code.
From the directory where the source code is stored, run the following command:
^{pr 3}$
Key Features
As the preview edition, the EnOS Device SDK for MQTT for Python currently contains only partial of the EnOS connection features as listed below:
- MQTT over TCP based on username and password and MQTT over TLS 1.2 with X.509 certificate-based mutual authentication supported
- Create, read, update and delete (CRUD) gateway device topology
- Publish measure point data
Sample Code
Preparation (Cloud Configuration and Certificate Generation)
Create device models (including gateway and sub-device), products, devices, and applications in the EnOS Console, and get the corresponding product key-secret and device key-secret pair. For more information, see Cloud Configuration在enos文档中心。
获取ca根证书,生成本地rsa私钥和csr文件,并通过enos证书工具(enos sdk中心提供)从enos应用证书。
建立连接
- 初始化mqtt客户机并加载证书文件。
client=MqttClient(enos_mqtt_url,gateway_product_key,gateway_device_key,gateway_device_secret)client.getProfile().setSSLContext(ca_file,cer_file,key_file,key_file_password)client.getProfile().setAutoReconnect(True)# if connection interrupted, the client can automaticlly reconnect
- 设置联机和脱机回调方法。
client.onOnline=onOnlineclient.onOffline=onOffineclient.onConnectFailed=onConnectFailed
- 以同步模式连接。
client.connect()
注册handle消息以处理下游接收的measurepoint
client.onMessage(handle_msg)
发送遥测
- 为子设备添加拓扑
topo_req=TopoAddRequest.builder().addSubDevice(SubDeviceInfo(sub_product_key,sub_device_key,sub_device_secret)).build()topo_res=client.publish(topo_req)
- 登录子设备
login_req=SubDeviceLoginRequest.builder().setSubDeviceInfo(sub_product_key,sub_device_key,sub_device_secret).build()login_res=client.publish(login_req)
- 从MQTT客户机发送遥测
meapt_req=MeasurepointPostRequest.builder() \ .setProductKey(sub_product_key).setDeviceKey(sub_device_key) \ .addMeasurePoint('MeasurePoint1',value1) \ # the measure point identity.addMeasurePoint('MeasurePoint2',value2) \ .setTimestamp(timestamp) \ .build()meapt_res=client.publish(meapt_req)
端到端样本
frommessage.upstream.status.SubDeviceLoginRequestimportSubDeviceLoginRequestfromcore.MqttClientimportMqttClientfrommessage.upstream.topo.TopoAddRequestimportTopoAddRequestfrommessage.upstream.topo.SubDeviceInfoimportSubDeviceInfofrommessage.upstream.tsl.MeasurepointPostRequestimportMeasurepointPostRequestfrommessage.upstream.topo.TopoGetRequestimportTopoGetRequestfrommessage.upstream.topo.TopoDeleteRequestimportTopoDeleteRequestimportrandomimporttime# mqtt broker urlenos_mqtt_url="tcp://{HOST}:11883"# for tcp connection# gateway parametersgateway_product_key="GATEWAY_PRODUCT_KEY"gateway_product_secret='GATEWAY_PRODUCT_SECRET'gateway_device_key="GATEWAY_DEVICE_KEY"gateway_device_secret="GATEWAY_DEVICE_SECRET"# sub-device parameterssub_product_key='SUB_PRODUCT_KEY'sub_device_key="SUB_DEVICE_KEY"sub_device_secret="SUB_DEVICE_SECRET"# these file are generated by get_cert.py via EnOS cert tool, for tls connectionca_file='edge_ca.pem'key_file='edge.key'cer_file='edge.pem'key_file_password='PRIVATE_KEY_PASSWORD'defonConnectFailed():print('connect failed...')time.sleep(10)client.connect()defonOnline():globalconnectedconnected=Truelogin_sub_device(client)# login the sub-device if exists sub-deviceprint('connected...')defonOffine():globalconnectedconnected=Falseprint('disconnected...')defget_topo(mqtt_client):topo_get_req=TopoGetRequest.builder().build()topo_get_res=mqtt_client.publish(topo_get_req)iftopo_get_res:print('topo_response: code: %s'%topo_get_res.getCode())print(topo_get_res.getData())defadd_topo(mqtt_client):topo_req=TopoAddRequest.builder().addSubDevice(SubDeviceInfo(sub_product_key,sub_device_key,sub_device_secret)).build()topo_res=mqtt_client.publish(topo_req)iftopo_res:print('topo_response: code: %s'%topo_res.getCode())print('topo_response: message: %s'%topo_res.getMessage())defdelete_topo(mqtt_client):topo_del_req=TopoDeleteRequest.builder().addSubDevice(sub_product_key,sub_device_key).build()topo_del_res=mqtt_client.publish(topo_del_req)iftopo_del_res:print('topo_delete_response: %s'%topo_del_res.getCode())deflogin_sub_device(mqtt_client):login_req=SubDeviceLoginRequest.builder().setSubDeviceInfo(sub_product_key,sub_device_key,sub_device_secret).build()login_res=mqtt_client.publish(login_req)iflogin_res:print('login_response: code: %s'%login_res.getCode())print('login_response: message: %s'%login_res.getMessage())# post measure points data via MQTTdefpost_measure_points(mqtt_client,timestamp):meapt_req=MeasurepointPostRequest.builder() \ .setProductKey(sub_product_key).setDeviceKey(sub_device_key) \ .addMeasurePoint('MeasurePoint1',random.randint(100,200)) \ .addMeasurePoint('MeasurePoint2',random.randint(100,200)) \ .setTimestamp(timestamp) \ .build()meapt_res=mqtt_client.publish(meapt_req)ifmeapt_res:print('measurepointPost_response: %s'%meapt_res.getCode())# handle the received downstream message and implement your logicdefhandle_msg(arrivedMessage,replyHandler):''' :param arrivedMessage: <attributes:deviceKey,prodectKey,id,messageTopic,method,params,version> :param replyHandler: <method:replyWithPayload> '''# handle logicsuccess=1print(arrivedMessage.params)# set reply payloadifsuccess:code=200message='test'data=0else:code=2000message='test'data=1replyHandler.reply_with_payload(code=code,message=message,data=data)if__name__=="__main__":client=MqttClient(enos_mqtt_url,gateway_product_key,gateway_device_key,gateway_device_secret)# set the certificate files for bi-directional certificationclient.getProfile().setSSLContext(ca_file,cer_file,key_file,key_file_password)client.setupBasicLogger('INFO')client.onOnline=onOnlineclient.onOffline=onOffineclient.onConnectFailed=onConnectFailedconnected=Falseclient.connect()# connect in syncclient.onMessage(handle_msg)# register a handle_msg to handle the downstream received measurepointadd_topo(client)# add the device to the gateway as sub-devicewhileTrue:timestamp=int(time.time()*1000)# timestamp in millisecondspost_measure_points(client,timestamp)# publish measure points datatime.sleep(10)