赞
踩
KepServer是一款能够方便地对plc等工业设备进行数据读取的工具。Python可使用win32com模块实现通过kepserver读取plc数据的功能。
win32com模块的安装命令:pip install pypiwin32
1.安装Python:python3各版本均可,但必须是32位。这里使用的是 python-3.7.9,windows下32位。下载:
python-3.7.9-32位安装程序.rar-互联网文档类资源-CSDN下载
2.安装KepServer:各版本均可。这里使用的是Kepware.KEPServerEX.V6,下载:
KEPServerEXV6.4.rar_python读写kepware数据-其它文档类资源-CSDN下载
在KepServer中新建Channel1 -> Device1,建立tag1、tag2、tag3三个变量,连接plc后,在QC下查看数据是否能够正常读取。
3.注册opcdaauto
将opcdaauto.dll文件置于C:\Windows\System32目录下(使用64位环境时,置于SysWOW64目录下),cmd执行命令:regsvr32 C:\Windows\System32\opcdaauto.dll。下载:
注册opcdaauto.rar-互联网文档类资源-CSDN下载
- import logging
- import re
- import time
- from typing import List
-
- import win32com.client
- from win32com.client import gencache
-
- logging.basicConfig(level='DEBUG')
- logger = logging.getLogger('dll_dispatch')
-
- #加载dll对象
- OPC_DA_DLL = gencache.EnsureModule('{28E68F91-8D75-11D1-8DC3-3C302A000000}', 0, 1, 0)
- opcServer = OPC_DA_DLL.OPCServer() #获取opcServer对象
- group_name_map = {} # 存放已添加的组名
- item_name_map = {}
- Groups = None
-
- def connect_opc(progID: str, node: str = '127.0.0.1'):
- opcServer.Connect(progID, node)
- logger.info('已连接到opc - [{}:{}]'.format(node, progID))
- return opcServer
-
- def disconnect_opc(opcServer):
- logger.info('opc断开连接')
- opcServer.Disconnect()
-
- def get_servers(opcServer) -> List[str]:
- """查询可用opc服务"""
- return opcServer.GetOPCServers()
-
- class GroupProperty:
- # DeadBand
- IsActive: bool = True
- IsSubscribed: bool = True
- UpdateRate: int = 1000
- DefaultGroupIsActive: bool = True
- DefaultGroupDeadband: int = 0
-
- def get_groups(opcServer, groupProperty: GroupProperty):
- global Groups
- if not Groups:
- Groups = opcServer.OPCGroups
- Groups.DefaultGroupIsActive = groupProperty.IsActive
- Groups.DefaultGroupDeadband = groupProperty.DefaultGroupDeadband
- Groups.DefaultGroupUpdateRate = groupProperty.UpdateRate
- return Groups
-
- def get_group(opcServer, opcGroupName: str, groupProperty: GroupProperty):
- opcGroups = get_groups(opcServer, GroupProperty())
- if opcGroupName not in group_name_map:
- opcGroup = opcGroups.Add(opcGroupName)
- opcGroup.IsActive = groupProperty.IsActive
- opcGroup.UpdateRate = groupProperty.UpdateRate
- opcGroup.IsSubscribed = groupProperty.IsSubscribed
-
- group_name_map[opcGroupName] = opcGroup
- else:
- logger.debug('重复添加 GroupName - {}'.format(opcGroupName))
-
- return group_name_map[opcGroupName]
-
-
- def get_items(opcServer, opcGroupName: str, groupProperty: GroupProperty):
- group = get_group(opcServer, opcGroupName, groupProperty)
- return group.OPCItems
-
-
- def add_item(opcServer, itemName: str):
- """添加一个点位"""
- if itemName not in item_name_map:
- try:
- groupName = re.split(r'[.$][^.$]*?$', itemName, 1)[0]
- except IndexError:
- logger.error('点位名-{}-不符合规则'.format(itemName))
- return
- items = get_items(opcServer, groupName, GroupProperty())
- try:
- item = items.AddItem(itemName, len(item_name_map) + 1)
- # 添加失败时 item 为 None
- if not items:
- raise RuntimeError('add item[{}] return None'.format(itemName))
- except Exception as e:
- logger.error('添加点位-{}-失败! [点位可能不存在]'.format(itemName), exc_info=True)
- item_name_map[itemName] = None
- else:
- item.IsActive = True
- time.sleep(0.1)
- item_name_map[itemName] = item
- else:
- logger.debug('重复添加 itemName - {}'.format(itemName))
-
-
- def add_items(opcServer, item_list: List[str]):
- """批量添加点位"""
- for item in item_list:
- add_item(opcServer, item)
-
-
- def _sync_read(item):
- data = item.Read(win32com.client.constants.OPCDevice, 0, 0, 0)
- if not data:
- raise ValueError('sync_read return None.')
- return data
-
- def sync_read_item(opcServer, itemName: str):
- """
- 同步读取一个 opc 点位
- :param opcServer:
- :param itemName:
- :return: (数据, 数据质量, 时间)
- """
- if itemName not in item_name_map:
- add_item(opcServer, itemName)
- logger.debug('读取 - {}'.format(itemName))
- item = item_name_map[itemName]
- try:
- if not item:
- raise ValueError('点位不存')
- return _sync_read(item)
- except Exception as e:
- logger.error('采集失败: {} - {}'.format(itemName, e), exc_info=True)
- return None, 0, 0
-
-
- def sync_read_items(opcServer, itemNameList: List[str]):
- result = []
- for itemName in itemNameList:
- if itemName not in item_name_map:
- add_item(opcServer, itemName)
-
- result.append(sync_read_item(opcServer, itemName))
- return result
-
- if __name__ == '__main__':
- opcServer = connect_opc('Kepware.KEPServerEX.V6', '127.0.0.1')
- print(get_servers(opcServer))
- # 定义点位列表
- tag_list = ['Channel1.Device1.tag1', 'Channel1.Device1.tag2', 'Channel1.Device1.tag3']
- try:
- while True:
- data = sync_read_items(opcServer, tag_list)
- for i in data:
- print(i[0])
- time.sleep(2)
- except Exception:
- logger.error('异常退出', exc_info=True)
- finally:
- opcServer.Disconnect()
主要参考
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。