当前位置:   article > 正文

python通过KepServer读取plc数据_kepserve 可以直接读取plc数据么

kepserve 可以直接读取plc数据么

KepServer是一款能够方便地对plc等工业设备进行数据读取的工具。Python可使用win32com模块实现通过kepserver读取plc数据的功能。

win32com模块的安装命令:pip install pypiwin32

一、运行环境

1.安装Python:python3各版本均可,但必须是32位。这里使用的是 python-3.7.9,windows下32位。下载:

python-3.7.9-32位安装程序.rar-互联网文档类资源-CSDN下载

2.安装KepServer:各版本均可。这里使用的是Kepware.KEPServerEX.V6,下载:

KEPServerEXV6.4.rar_python读写kepware数据-其它文档类资源-CSDN下载

在KepServer中新建Channel1 -> Device1,建立tag1、tag2、tag3三个变量,连接plc后,在QC下查看数据是否能够正常读取。

3.注册opcdaauto

将opcdaauto.dll文件置于C:\Windows\System32目录下(使用64位环境时,置于SysWOW64目录下),cmd执行命令:regsvr32 C:\Windows\System32\opcdaauto.dll。下载:

注册opcdaauto.rar-互联网文档类资源-CSDN下载

二、python读取plc数据

  1. import logging
  2. import re
  3. import time
  4. from typing import List
  5. import win32com.client
  6. from win32com.client import gencache
  7. logging.basicConfig(level='DEBUG')
  8. logger = logging.getLogger('dll_dispatch')
  9. #加载dll对象
  10. OPC_DA_DLL = gencache.EnsureModule('{28E68F91-8D75-11D1-8DC3-3C302A000000}', 0, 1, 0)
  11. opcServer = OPC_DA_DLL.OPCServer() #获取opcServer对象
  12. group_name_map = {} # 存放已添加的组名
  13. item_name_map = {}
  14. Groups = None
  15. def connect_opc(progID: str, node: str = '127.0.0.1'):
  16. opcServer.Connect(progID, node)
  17. logger.info('已连接到opc - [{}:{}]'.format(node, progID))
  18. return opcServer
  19. def disconnect_opc(opcServer):
  20. logger.info('opc断开连接')
  21. opcServer.Disconnect()
  22. def get_servers(opcServer) -> List[str]:
  23. """查询可用opc服务"""
  24. return opcServer.GetOPCServers()
  25. class GroupProperty:
  26. # DeadBand
  27. IsActive: bool = True
  28. IsSubscribed: bool = True
  29. UpdateRate: int = 1000
  30. DefaultGroupIsActive: bool = True
  31. DefaultGroupDeadband: int = 0
  32. def get_groups(opcServer, groupProperty: GroupProperty):
  33. global Groups
  34. if not Groups:
  35. Groups = opcServer.OPCGroups
  36. Groups.DefaultGroupIsActive = groupProperty.IsActive
  37. Groups.DefaultGroupDeadband = groupProperty.DefaultGroupDeadband
  38. Groups.DefaultGroupUpdateRate = groupProperty.UpdateRate
  39. return Groups
  40. def get_group(opcServer, opcGroupName: str, groupProperty: GroupProperty):
  41. opcGroups = get_groups(opcServer, GroupProperty())
  42. if opcGroupName not in group_name_map:
  43. opcGroup = opcGroups.Add(opcGroupName)
  44. opcGroup.IsActive = groupProperty.IsActive
  45. opcGroup.UpdateRate = groupProperty.UpdateRate
  46. opcGroup.IsSubscribed = groupProperty.IsSubscribed
  47. group_name_map[opcGroupName] = opcGroup
  48. else:
  49. logger.debug('重复添加 GroupName - {}'.format(opcGroupName))
  50. return group_name_map[opcGroupName]
  51. def get_items(opcServer, opcGroupName: str, groupProperty: GroupProperty):
  52. group = get_group(opcServer, opcGroupName, groupProperty)
  53. return group.OPCItems
  54. def add_item(opcServer, itemName: str):
  55. """添加一个点位"""
  56. if itemName not in item_name_map:
  57. try:
  58. groupName = re.split(r'[.$][^.$]*?$', itemName, 1)[0]
  59. except IndexError:
  60. logger.error('点位名-{}-不符合规则'.format(itemName))
  61. return
  62. items = get_items(opcServer, groupName, GroupProperty())
  63. try:
  64. item = items.AddItem(itemName, len(item_name_map) + 1)
  65. # 添加失败时 item 为 None
  66. if not items:
  67. raise RuntimeError('add item[{}] return None'.format(itemName))
  68. except Exception as e:
  69. logger.error('添加点位-{}-失败! [点位可能不存在]'.format(itemName), exc_info=True)
  70. item_name_map[itemName] = None
  71. else:
  72. item.IsActive = True
  73. time.sleep(0.1)
  74. item_name_map[itemName] = item
  75. else:
  76. logger.debug('重复添加 itemName - {}'.format(itemName))
  77. def add_items(opcServer, item_list: List[str]):
  78. """批量添加点位"""
  79. for item in item_list:
  80. add_item(opcServer, item)
  81. def _sync_read(item):
  82. data = item.Read(win32com.client.constants.OPCDevice, 0, 0, 0)
  83. if not data:
  84. raise ValueError('sync_read return None.')
  85. return data
  86. def sync_read_item(opcServer, itemName: str):
  87. """
  88. 同步读取一个 opc 点位
  89. :param opcServer:
  90. :param itemName:
  91. :return: (数据, 数据质量, 时间)
  92. """
  93. if itemName not in item_name_map:
  94. add_item(opcServer, itemName)
  95. logger.debug('读取 - {}'.format(itemName))
  96. item = item_name_map[itemName]
  97. try:
  98. if not item:
  99. raise ValueError('点位不存')
  100. return _sync_read(item)
  101. except Exception as e:
  102. logger.error('采集失败: {} - {}'.format(itemName, e), exc_info=True)
  103. return None, 0, 0
  104. def sync_read_items(opcServer, itemNameList: List[str]):
  105. result = []
  106. for itemName in itemNameList:
  107. if itemName not in item_name_map:
  108. add_item(opcServer, itemName)
  109. result.append(sync_read_item(opcServer, itemName))
  110. return result
  111. if __name__ == '__main__':
  112. opcServer = connect_opc('Kepware.KEPServerEX.V6', '127.0.0.1')
  113. print(get_servers(opcServer))
  114. # 定义点位列表
  115. tag_list = ['Channel1.Device1.tag1', 'Channel1.Device1.tag2', 'Channel1.Device1.tag3']
  116. try:
  117. while True:
  118. data = sync_read_items(opcServer, tag_list)
  119. for i in data:
  120. print(i[0])
  121. time.sleep(2)
  122. except Exception:
  123. logger.error('异常退出', exc_info=True)
  124. finally:
  125. opcServer.Disconnect()

主要参考

  1. https://blog.csdn.net/wanzheng_96/article/details/108431097?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-6.control&dist_request_id=1332042.24142.16193357577789097&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-6.control

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/455818
推荐阅读
相关标签
  

闽ICP备14008679号