赞
踩
万物互联时代,DIY智能产品离不开物联网,本文意在记录快速接入流程,避免深陷官方文档中无法自拔。本文通过MQTT协议接入Onenet,用Python进行简单的调试,通过实验测试各项功能是否可靠。
https://open.iot.10086.cn/console
OneNET是由中国移动打造的PaaS物联网开放平台。类似的平台有阿里云,机智云等,之所以选择Onenet,是因为使用起来十分简洁高效,开发文档清晰明了,提供了许多API接口方便调试。
为了一劳永逸,本文对于Onenet平台中控制台的操作仅需要添加产品,其他操作全部通过API完成,方便快速移植,实现全自动部署。
注册登录后,进入控制台注册产品,
添加产品后暂不添加设备
进入产品页面查看以下信息
产品ID:493938
Master-APIkey:MrXYtMYoqeWzt5d=WQaCMja=6F8=
access_key:FDnETJpnXnyMA6g2feVcFkFHrv6PxVvIkVrYBdfS8/w=
设备注册码:r6jc2WyJluQrV36F
API使用详情:https://open.iot.10086.cn/doc/v5/develop/detail/481
为提高API访问安全性,OneNET API的鉴权参数作为header参数存在,API的访问有两种安全机制,普通方式直接传输密钥,不够安全,token鉴权更为安全,但操作上需要复杂了一步,就是需要增加生成token的算法,官方也有提供,先测试普通方式。
众所周知,HTTP请求方式有POST、GET、PUT等,请求最多三部分组成:URL、Header、Body,根据文档配置好三个部分的参数即可。
新增设备api-key使用产品级密钥Master-APIkey,Body使用json格式数据。
请求方式:POST
运行
#新增设备
import requests
import json
MasterAPIkey = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#产品核心密钥
URL = "http://api.heclouds.com/devices"
Header = {"api-key" : MasterAPIkey}
Body = {"title": "test_device"}
r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))
返回
{
"errno": 0,
"data": {
"device_id": "906086916"
},
"error": "succ"
}
注册设备不需要密钥,只需要注册码,Body使用json格式数据。
请求方式:POST
运行
#注册设备
import requests
import json
register_code = "r6jc2WyJluQrV36F"#注册码
URL = "http://api.heclouds.com/register_de?register_code="+register_code
Body = {"title":"python_device","sn": "123456789"}
r = requests.post(URL,json=Body).json()
print(json.dumps(r, indent=2))
返回
{
"errno": 0,
"data": {
"device_id": "906087737",
"key": "Wh83vAaZGFee8ktcFDbZOTVoh90="
},
"error": "succ"
}
查询设备api-key可以使用产品级密钥Master-APIkey,也可以使用设备级密钥key,设备ID和设备级密钥可通过以上注册设备API获得,也可在设备管理页面查看。GET请求无需Body。
请求方式:GET
运行
#查询设备
import requests
import json
device_id = "906087737"#设备ID
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥
URL = "http://api.heclouds.com/devices/"+device_id
Header = {"api-key" : key}
r = requests.get(URL,headers=Header).json()
print(json.dumps(r, indent=2))
返回
{ "errno": 0, "data": { "protocol": "MQTT", "private": true, "create_time": "2022-03-12 14:53:26", "keys": [ { "title": "auto generated device key", "key": "Wh83vAaZGFee8ktcFDbZOTVoh90=" } ], "online": false, "id": "906087737", "auth_info": "123456789", "title": "python_device" }, "error": "succ" }
新增数据流Body中必填参数为数据流ID。
请求方式:POST
运行
#新增数据流
import requests
import json
device_id = "906087737"
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥
URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams"
Header = {"api-key" : key}
Body = {"id": "temp"}
r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))
返回
{
"errno": 0,
"data": {
"ds_uuid": "d94a90f5-8bd0-5672-b967-14082115dd79"
},
"error": "succ"
}
上传数据点Body中按制定格式填写,可批量上传。
请求方式:POST
运行
#上传数据点
import requests
import json
device_id = "906087737"
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥
URL = "http://api.heclouds.com/devices/"+device_id+"/datapoints"
Header = {"api-key" : key}
Body = {"datastreams": [{"id": "temp","datapoints": [{"value": 62} ]}]}
r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))
返回
{
"errno": 0,
"error": "succ"
}
查询数据流,URL对应相应的数据流ID。
请求方式:GET
运行
#查询数据流
import requests
import json
device_id = "906087737"
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥
datastream_id = "temp"
URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams/"+datastream_id
Header = {"api-key" : key}
r = requests.get(URL,headers=Header).json()
print(json.dumps(r, indent=2))
返回
{
"errno": 0,
"data": {
"update_at": "2022-03-12 15:21:08",
"id": "temp",
"create_time": "2022-03-12 15:11:14",
"current_value": 62
},
"error": "succ"
}
发布消息URL指定相应的Topic,api-key必须使用产品级密钥Master-APIkey。
请求方式:POST
运行
#发布消息
import requests
import json
MasterAPIkey = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#产品核心密钥
URL = "http://api.heclouds.com/mqtt?topic=python_topic"
Header = {"api-key" : MasterAPIkey}
Body = "Hello mqtt!"
r = requests.post(URL,data=Body,headers=Header).json()
print(json.dumps(r, indent=2))
返回
{
"errno": 6,
"error": "invalid parameter: topic is not exists"
}
返回中提示topic不存在,因为没有设备订阅过topic,通过4调试MQTT订阅该topic后便不会出错。
运行
#新增设备 print("新增设备") import requests import json MasterAPIkey = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#产品核心密钥 URL = "http://api.heclouds.com/devices" Header = {"api-key" : MasterAPIkey} Body = {"title": "test_device"} r = requests.post(URL,json=Body,headers=Header).json() print(json.dumps(r, indent=2)) #注册设备 print("注册设备") register_code = "r6jc2WyJluQrV36F"#注册码 URL = "http://api.heclouds.com/register_de?register_code="+register_code Body = {"title":"python_device","sn": "123456789"} r = requests.post(URL,json=Body).json() print(json.dumps(r, indent=2)) #查询设备 print("查询设备") device_id = r["data"]["device_id"]#设备ID key = r["data"]["key"]#设备密钥 URL = "http://api.heclouds.com/devices/"+device_id Header = {"api-key" : key} r = requests.get(URL,headers=Header).json() print(json.dumps(r, indent=2)) #新增数据流 print("新增数据流") datastream_id = "temp" URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams" Header = {"api-key" : key} Body = {"id": datastream_id} r = requests.post(URL,json=Body,headers=Header).json() print(json.dumps(r, indent=2)) #上传数据点 print("上传数据点") URL = "http://api.heclouds.com/devices/"+device_id+"/datapoints" Header = {"api-key" : key} Body = {"datastreams": [{"id": datastream_id,"datapoints": [{"value": 62} ]}]} r = requests.post(URL,json=Body,headers=Header).json() print(json.dumps(r, indent=2)) #查询数据流 print("查询数据流") URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams/"+datastream_id Header = {"api-key" : key} r = requests.get(URL,headers=Header).json() print(json.dumps(r, indent=2)) #发布消息 print("发布消息") URL = "http://api.heclouds.com/mqtt?topic=python_topic" Header = {"api-key" : MasterAPIkey} Body = "Hello mqtt!" r = requests.post(URL,data=Body,headers=Header).json() print(json.dumps(r, indent=2))
返回
新增设备 { "errno": 0, "data": { "device_id": "906147806" }, "error": "succ" } 注册设备 { "errno": 0, "data": { "device_id": "906147809", "key": "R7YqtyhNtN2JkeeR0Axo=B6jnEY=" }, "error": "succ" } 查询设备 { "errno": 0, "data": { "protocol": "MQTT", "private": true, "create_time": "2022-03-12 17:35:43", "keys": [ { "title": "auto generated device key", "key": "R7YqtyhNtN2JkeeR0Axo=B6jnEY=" } ], "online": false, "id": "906147809", "auth_info": "123456789", "title": "python_device" }, "error": "succ" } 新增数据流 { "errno": 0, "data": { "ds_uuid": "934076ef-adf9-53e3-9230-6159e3daedfc" }, "error": "succ" } 上传数据点 { "errno": 0, "error": "succ" } 查询数据流 { "errno": 0, "data": { "update_at": "2022-03-12 17:35:43", "id": "temp", "create_time": "2022-03-12 17:35:43", "current_value": 62 }, "error": "succ" } 发布消息 { "errno": 6, "error": "invalid parameter: no device subscribe to this topic" }
连接Onenet服务器,mqtt用户名和密码对应产品ID和鉴权信息
运行
import paho.mqtt.client as mqtt import random,json HOST = "183.230.40.39" PORT = 6002 Device_id = "906147809"#设备ID Product_id = "493938"#产品ID Mqtt用户名 Authorization = "123456789"#鉴权信息 Mqtt密码 def on_connect(client, userdata, flags, rc): print("连接结果:" + mqtt.connack_string(rc)) def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client(Device_id) client.username_pw_set(Product_id,Authorization) client.on_connect = on_connect#连接回调 client.on_message = on_message#消息回调 client.connect(HOST, PORT, 120) client.loop_forever()
返回
连接结果:Connection Accepted.
查看设备详情
订阅python_topic
client.subscribe("python_topic")
设备使⽤publish报⽂来上传数据点,需要往系统指定主题发送指定格式数据,这两点非常关键。见设备终端接入协议-MQTT V2.6
指定主题:$dp
指定格式数据:3字节报头 + 数据
3字节报头:
JSON格式2对应的数据格式较简单:{“datastream_id”:”value”}
将数据按json格式2打包:
# 转换为Json格式2字节数组
def payload_json2(json_data):
data_bytes = json.dumps(json_data).encode('utf-8')
byte1 = 0x03
byte2 = len(data_bytes)>>8
byte3 = len(data_bytes)%256
payload = bytes([byte1,byte2,byte3]) + data_bytes
return payload
上传数据点
# 上传数据点
value = random.randint(0,100)#生成随机整数
data = {"temp":value}
payload = payload_json2(data)
client.publish("$dp", payload, qos=0)
import paho.mqtt.client as mqtt import random,json HOST = "183.230.40.39" PORT = 6002 Device_id = "906147809"#设备ID Product_id = "493938"#产品ID Mqtt用户名 Authorization = "123456789"#鉴权信息 Mqtt密码 # 转换为Json格式2字节数组 def payload_json2(json_data): data_bytes = json.dumps(json_data).encode('utf-8') byte1 = 0x03 byte2 = len(data_bytes)>>8 byte3 = len(data_bytes)%256 payload = bytes([byte1,byte2,byte3]) + data_bytes return payload def on_connect(client, userdata, flags, rc): print("连接结果:" + mqtt.connack_string(rc)) client.subscribe("python_topic")#订阅python_topic def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) if msg.topic == "python_topic": # 上传数据点 value = random.randint(0,100) data = {"temp":value} payload = payload_json2(data) client.publish("$dp", payload, qos=0) client = mqtt.Client(Device_id) client.username_pw_set(Product_id,Authorization) client.on_connect = on_connect#连接回调 client.on_message = on_message#消息回调 client.connect(HOST, PORT, 120) client.loop_forever()
通过实验找到了一种简洁使用Onenet平台MQTT协议开发个人智能设备的解决方案
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。