赞
踩
最近在打比赛,做了一个项目,本人有幸主要负责云端数据部分,在此写一下我的工作流程
我们做的是驾驶员疲劳检测,此处利用了一个大佬的源代码并做出了一些修改
源代码链接
在原作者基础上导入了一个包并就“驾驶员疲劳,玩手机,喝水,吸烟”四个状态及进行定义
import csv
#云端数据
fatigue_driving = 0
phone = 0
drink = 0
smoke = 0
修改摄像头类的代码块
# 在函数中引入定义的全局变量
global fatigue_driving,phone,drink,smoke
如果成功检测,添加
#云端数据实时修改
if "phone" in lab:
phone = 1
else:
phone = 0
if "drink" in lab:
drink = 1
else:
drink = 0
if "smoke" in lab:
smoke = 1
else:
smoke = 0
在函数末尾将数据导入预先存好的excle中,filename是你自己的文件名
fatigue_driving | 1 |
---|---|
phone | 0 |
drink | 0 |
smoke | 0 |
# 打开CSV文件 with open(filename, 'r', newline='') as csvfile: # 读取CSV文件内容 csvreader = csv.reader(csvfile) # 转换为列表,方便修改数据 data = list(csvreader) #print(data) yunshuju = [fatigue_driving, phone, drink, smoke] # 修改每一行第二列的数字 for k in range(len(yunshuju)): # 假设第二列是数字类型 data[k][1] =yunshuju[k] #print(data) # 保存修改后的数据回到文件 with open(filename, 'w', newline='') as csvfile: # 创建CSV写入对象 csvwriter = csv.writer(csvfile) # 写入修改后的数据 csvwriter.writerows(data)
修改之后,就可以实时改变excle中的数据了
华为云
选择产品设备接入IoTDA
协议类型MQTT,数据格式JSON,其他随意
自定义模型
添加属性名称,定义他们的性质
最后效果
注册设备,随便填一个设备标识码和密钥,记住他们,其实不记住也行,因为确定之后华为云会生成一个文件DEVICES-KEY.txt,生成之后开始应该是未激活
IoT Device SDK(Python)提供设备接入华为云IoT物联网平台的Python版本的SDK,提供设备和平台之间通讯能力,以及设备服务、网关服务、OTA等高级服务,并且针对各种场景提供了丰富的demo代码。具体内容可以参考IoT Device SDK(Python)使用指南,我们还需要他里面的包iot_device_sdk_python
from __future__ import absolute_import from iot_device_sdk_python.client.listener.default_publish_action_listener import DefaultPublishActionListener from typing import List import time import logging from iot_device_sdk_python.client.client_conf import ClientConf from iot_device_sdk_python.client.connect_auth_info import ConnectAuthInfo from iot_device_sdk_python.iot_device import IotDevice from iot_device_sdk_python.client.listener.property_listener import PropertyListener from iot_device_sdk_python.client.request.service_property import ServiceProperty from iot_device_sdk_python.client import iot_result logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(threadName)s - %(filename)s[%(funcName)s] - %(levelname)s: %(message)s") logger = logging.getLogger(__name__)
属性上报指的是设备将当前属性值上报给平台。属性设置指的是平台设置设备的属性值
创建设备,并且设置无限循环while True,设置循环时间time.sleep(1)
class cscDetector: _logger = logging.getLogger(__name__) def __init__(self, server_uri, port, device_id, secret, iot_cert_file): self.server_uri = server_uri self.port = port self.device_id = device_id self.secret = secret self.iot_cert_file = iot_cert_file def start(self): """ 创建设备 """ connect_auth_info = ConnectAuthInfo() connect_auth_info.server_uri = self.server_uri connect_auth_info.port = self.port connect_auth_info.id = self.device_id connect_auth_info.secret = self.secret connect_auth_info.iot_cert_path = self.iot_cert_file connect_auth_info.bs_mode = ConnectAuthInfo.BS_MODE_DIRECT_CONNECT client_conf = ClientConf(connect_auth_info) device = IotDevice(client_conf) device.get_client().set_properties_listener(IICBListener(PropertyListener)) if device.connect() != 0: logger.error("init failed") return # 10s后上报一次设备的属性 time.sleep(10) # 按照产品模型设置属性 service_property = ServiceProperty() service_property.service_id = "csc" service_property.properties = {"fatigue_driving": 0, "phone": 0, "drink": 0, "smoke": 0} # 组装成列表的形式 services = [service_property] # 上报设备属性 device.get_client().report_properties(services) while True: service_property = ServiceProperty() service_property.service_id = "csc" data_upload = Upload_data_read(filename) print(data_upload) service_property.properties = {"fatigue_driving": data_upload.get('fatigue_driving'), "phone": data_upload.get('phone'), "drink": data_upload.get('drink'), "smoke": data_upload.get('smoke')} # 组装成列表的形式 services = [service_property] device.get_client().report_properties(services, DefaultPublishActionListener()) time.sleep(1)
附:数据阅读函数
def Upload_data_read(filename):
data_file = open(filename, encoding='utf-8')
data_upload = dict()
for line in data_file.readlines():
curLine = line.split(',')
data_upload[curLine[0]]= int(curLine[1])
return data_upload
定义类cscListener,两个方法:on_property_set方法处理写属性,on_property_get方法处理读属性。
class cscListener(PropertyListener): def __init__(self, iot_device: IotDevice): """ 传入一个IotDevice实例 """ self.device = iot_device def on_property_set(self, request_id: str, services: List[ServiceProperty]): """ 处理写属性 :param request_id: 请求id :param services: List<ServiceProperty> """ """ 遍历service """ for service_property in services: logger.info("on_property_set, service_id:" + service_property.service_id) """ 遍历属性 """ for property_name in service_property.properties: logger.info("set property name:" + property_name) logger.info("set property value:" + str(service_property.properties[property_name])) self.device.get_client().respond_properties_set(request_id, iot_result.SUCCESS) def on_property_get(self, request_id: str, service_id: str): """ 处理读属性。多数场景下,用户可以直接从平台读设备影子,此接口不用实现。 但如果需要支持从设备实时读属性,则需要实现此接口。 :param request_id: 请求id :param service_id: 服务id,可选 """ service_property = ServiceProperty() service_property.service_id = "csc" data_upload = Upload_data_read(filename) service_property.properties = {"fatigue_driving": data_upload.get('fatigue_driving'), "phone": data_upload.get('phone'), "drink": data_upload.get('drink'), "smoke": data_upload.get('smoke')} services = [service_property] self.device.get_client().respond_properties_get(request_id, services)
根据自己的信息填写
def main():
server_uri = ""
port = 8883
device_id = ""
secret = ""
iot_ca_cert_path = "./resources/GlobalSignRSAOVSSLCA2018.crt.pem"
IICB_detector = IICBDetector(server_uri=server_uri,
port=port,
device_id=device_id,
secret=secret,
iot_cert_file=iot_ca_cert_path)
IICB_detector.start()
同时运行两个函数
云平台端
呦呵,收获不错
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。