赞
踩
本文章为原创,转载请注明出处!
账号:iotos_test 密码:iotos123
代码地址:IOTOSDK-Python: IOTOS Python版本SDK,自带原生接口和采集引擎 (gitee.com)
目录
BACnet是用于智能建筑的通信协议,是国标标准化组织(ISO)、美国国家标准协会(ANSI)及美国采暖、制冷与空调工程师学会(ASHRAE)定义的通信协议。BACnet针对智能建筑及控制系统的应用所设计的通信,可用在暖通空调系统(HVAC,包括暖气、通风、空气调节),也可以用在照明控制、门禁系统、火警侦测系统及其相关的设备。优点在于能降低维护系统所需成本并且安装比一般工业通信协议更为简易,而且提供有五种业界常用的标准协议,此可防止设备供应商及系统业者的垄断,也因此未来系统扩展性与兼容性大为增加。
将BACnet协议设备的数据拿到并上传至云上
BACnet协议设备
- #coding=utf-8
- import sys
- sys.path.append("..")
- import BAC0
- import time
- from driver import *
-
- class Bacnet(IOTOSDriverI):
- def InitComm(self,attrs):
- self.setPauseCollect(False)
- self.setCollectingOneCircle=True
- self.online(True)
-
- #建立连接并且在通路里搜索bacnet设备的ip和设备id
- try:
- self.bacnet=BAC0.connect()
- self.bacnet.whois()
-
- # 搜索局域网内的bacnet协议设备并且打印出来
- for each in self.bacnet.discoveredDevices:
- deviceName = (self.bacnet.read('%s device %s objectName' % (each[0], each[1])))
- self.deviceAddr = each[0]
- self.debug('Found device : %s at address %s' % (deviceName, self.deviceAddr))
- # 打印设备地址为deviceAddr 的objectList property 前十个
- read_pro = self.deviceAddr + ' device 3 objectList'
- self.debug(self.bacnet.read(read_pro)[:10])
-
- except Exception as e:
- self.bacnet.disconnect()
-
- def Collecting(self,dataId):
- try:
- cfgtmp = self.data2attrs[dataId]['config']
- #过滤掉非采集点
- if cfgtmp["param"] == "":
- return ()
-
- # 过滤采集点
- if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:
- return ()
- else:
- self.debug(self.name(dataId))
-
- #获取用于数据下发的点
- if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='write' and 'num' in cfgtmp['param']:
- if "memoryvalue" not in self.data2attrs[dataId]:
- return ('请下发',)
- else:
- return (self.data2attrs[dataId]["memoryvalue"],)
-
- #上传数据点
- if 'objectName' in cfgtmp['param'] and 'num' in cfgtmp['param']:
- data_val = self.deviceAddr + ' ' + str(cfgtmp['param']['objectName'])+' '+str(cfgtmp['param']['num'])+' '+'objectName description presentValue units'
- self.debug(data_val)
- #读取bacnet设备中属性的值
- data = self.bacnet.readMultiple(data_val)
- return (str(data[2]),)
-
- except Exception as e:
- # 连接会一定时间后断开,需要再次开启
- self.bacnet.disconnect()
- self.bacnet = BAC0.connect()
- self.bacnet.whois()
- return ()
-
- def Event_setData(self, dataId, value):
- #更改bacnet里面属性的值,一般只能是analoValue属性
- if 'private' in self.data2attrs[dataId]['config']['param']:
- if self.data2attrs[dataId]['config']['param']['private']== 'write':
- data_wri=self.deviceAddr+' '+'analogValue'+' '+ str(self.data2attrs[dataId]['config']['param']['num']) +' presentValue ' + str(value)
- self.debug(data_wri)
- self.bacnet.write(data_wri)
- self.setValue(self.name(dataId), value)
-
- return json.dumps({'code': 0, 'msg': '', 'data': ''})

- #coding=utf-8
- import sys
- sys.path.append("..")
- import BAC0
- import time
- from driver import *
- class Bacnet(IOTOSDriverI):
- def InitComm(self,attrs):
- self.setPauseCollect(False)
- self.setCollectingOneCircle=True
- self.online(True)
-
- #建立连接并且在通路里搜索bacnet设备的ip和设备id
- try:
- self.bacnet=BAC0.connect()
- self.bacnet.whois()
-
- # 搜索局域网内的bacnet协议设备并且打印出来
- for each in self.bacnet.discoveredDevices:
- deviceName = (self.bacnet.read('%s device %s objectName' % (each[0], each[1])))
- self.deviceAddr = each[0]
- self.debug('Found device : %s at address %s' % (deviceName, self.deviceAddr))
- # 打印设备地址为deviceAddr 的objectList property 前十个
- read_pro = self.deviceAddr + ' device 3 objectList'
- self.debug(self.bacnet.read(read_pro)[:10])
-
- except Exception as e:
- self.bacnet.disconnect()

- def Collecting(self,dataId):
- try:
- cfgtmp = self.data2attrs[dataId]['config']
- #过滤掉非采集点
- if cfgtmp["param"] == "":
- return ()
-
- # 过滤采集点
- if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:
- return ()
- else:
- self.debug(self.name(dataId))
-
- #获取用于数据下发的点
- if 'private' in cfgtmp['param'] and cfgtmp['param']['private']=='write' and 'num' in cfgtmp['param']:
- if "memoryvalue" not in self.data2attrs[dataId]:
- return ('请下发',)
- else:
- return (self.data2attrs[dataId]["memoryvalue"],)
-
- #上传数据点
- if 'objectName' in cfgtmp['param'] and 'num' in cfgtmp['param']:
- data_val = self.deviceAddr + ' ' + str(cfgtmp['param']['objectName'])+' '+str(cfgtmp['param']['num'])+' '+'objectName description presentValue units'
- self.debug(data_val)
- #读取bacnet设备中属性的值
- data = self.bacnet.readMultiple(data_val)
- return (str(data[2]),)
-
- except Exception as e:
- # 连接会一定时间后断开,需要再次开启
- self.bacnet.disconnect()
- self.bacnet = BAC0.connect()
- self.bacnet.whois()
- return ()

- def Event_setData(self, dataId, value):
- #更改bacnet里面属性的值,一般只能是analoValue属性
- if 'private' in self.data2attrs[dataId]['config']['param']:
- if self.data2attrs[dataId]['config']['param']['private']== 'write':
- data_wri=self.deviceAddr+' '+'analogValue'+' '+ str(self.data2attrs[dataId]['config']['param']['num']) +' presentValue ' + str(value)
- self.debug(data_wri)
- self.bacnet.write(data_wri)
- self.setValue(self.name(dataId), value)
-
- return json.dumps({'code': 0, 'msg': '', 'data': ''})
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。