赞
踩
python3.7 - 32 位!32 位!32 位!
KEPServerEX.6.4
下载地址:https://pan.baidu.com/s/1R4bC-NKLEl4s4FqRfhAIwQ
提取码:07yy
OPCDAAuto.dll
C:\Windows\System32
目录下,检查系统是否经有同名文件(忽略大小写),如果没有或者 在使用中发生注册失败的错误,请先下载一份,保存路径随意关于软件是使用不做介绍,你也可以使用软件预置好的测试工程,比如点位: 通道 1.设备 1.TAG1
进入存放 OPCDAAuto.dll 文件的路径,在cmd下执行以下命令
regsvr32 OPCDAAuto.dll
提示注册成功,或者已经注册即可进行下一步编程。
import win32com.client
from win32com.client import DispatchWithEvents
from win32com.client import gencache
如果你的程序要通过pyistaller打包成可执行文件,需要额外导入以下包,防止编译完后由于module 缺失导致运行失败
import win32timezone
from win32com.client import Dispatch
from win32com.client import DispatchWithEvents
OPC_DA_DLL = gencache.EnsureModule('{28E68F91-8D75-11D1-8DC3-3C302A000000}', 0, 1, 0)
如果此处提示错误:
com_error: (-2147221164, '没有注册类', None, None)
,请先按照上面步骤 注册dll 到系统中
opcServer = OPC_DA_DLL.OPCServer()
如果此处提示错误信息为:
com_error: (-2147221164, '没有注册类', None, None)
那就先检查你的python版本是不是 32位的
node = '127.0.0.1'
for svr in opcServer.GetOPCServers(node):
print(svr)
# 本机只装了一个模拟器:Kepware.KEPServerEX.V6
Kepware.KEPServerEX.V6
progID = 'Kepware.KEPServerEX.V6'
node = '127.0.0.1'
opcServer.Connect(progID, node)
groups=opcServer.OPCGroups
groups.DefaultGroupIsActive = True
groups.DefaultGroupDeadband = 0
groups.DefaultGroupUpdateRate = 200
# 组名
group_name = '通道 1.设备 1'
# 同一组名不要重复添加,group 对象可以先保存到list或者dict里
group = groups.Add(group_name)
group.IsActive = True
group.IsSubscribed = True
group.UpdateRate = 100
group
<win32com.gen_py.None.OPCGroup>
items = group.OPCItems
tag_map = {}
tag = "通道 1.设备 1.TAG1"
# item 对象需要保存到list或者dict里,后续数据读写都要通过该对象
# 注意第二个参数从1开始, 每添加一个点位就+ 1
item = items.AddItem(tag, len(tag_map) + 1)
# 点位不存在回返回None
if item:
item.IsActive = True
tag_map[tag] = item
返回一个元组,(值,数据质量,时间),比如
data = item.Read(win32com.client.constants.OPCDevice, 0, 0, 0)
data
(200,
192,
pywintypes.datetime(2020, 9, 6, 6, 18, tzinfo=TimeZoneInfo('GMT Standard Time', True)))
无返回值
item.Write(200)
opcServer.Disconnect()
import logging import re import time from typing import List import win32com.client from win32com.client import gencache # import win32timezone # from win32com.client import Dispatch # from win32com.client import DispatchWithEvents logging.basicConfig(level='DEBUG') logger = logging.getLogger('dll_dispatch') OPC_DA_DLL = gencache.EnsureModule('{28E68F91-8D75-11D1-8DC3-3C302A000000}', 0, 1, 0) opcServer = OPC_DA_DLL.OPCServer() group_name_map = {} # 存放已添加的组名 item_name_map = {} Groups = None def connect_opc(progID: str, node: str = '127.0.0.1'): opcServer.Connect(progID, node) logger.info('已连接到opc - [{}:{}]'.format(node, progID)) return opcServer def disconnect_opc(opcServer): logger.info('opc断开连接') opcServer.Disconnect() def get_servers(opcServer) -> List[str]: """查询可用opc服务""" return opcServer.GetOPCServers() class GroupProperty: # DeadBand IsActive: bool = True IsSubscribed: bool = True UpdateRate: int = 1000 DefaultGroupIsActive: bool = True DefaultGroupDeadband: int = 0 # DefaultGroupProperty = GroupProperty() def get_groups(opcServer, groupProperty: GroupProperty): global Groups if not Groups: Groups = opcServer.OPCGroups Groups.DefaultGroupIsActive = groupProperty.IsActive Groups.DefaultGroupDeadband = groupProperty.DefaultGroupDeadband Groups.DefaultGroupUpdateRate = groupProperty.UpdateRate return Groups def get_group(opcServer, opcGroupName: str, groupProperty: GroupProperty): opcGroups = get_groups(opcServer, GroupProperty()) if opcGroupName not in group_name_map: opcGroup = opcGroups.Add(opcGroupName) opcGroup.IsActive = groupProperty.IsActive opcGroup.UpdateRate = groupProperty.UpdateRate opcGroup.IsSubscribed = groupProperty.IsSubscribed group_name_map[opcGroupName] = opcGroup else: logger.debug('重复添加 GroupName - {}'.format(opcGroupName)) return group_name_map[opcGroupName] def get_items(opcServer, opcGroupName: str, groupProperty: GroupProperty): group = get_group(opcServer, opcGroupName, groupProperty) return group.OPCItems def add_item(opcServer, itemName: str): """添加一个点位""" if itemName not in item_name_map: try: groupName = re.split(r'[.$][^.$]*?$', itemName, 1)[0] except IndexError: logger.error('点位名-{}-不符合规则'.format(itemName)) return items = get_items(opcServer, groupName, GroupProperty()) try: item = items.AddItem(itemName, len(item_name_map) + 1) # 添加失败时 item 为 None if not items: raise RuntimeError('add item[{}] return None'.format(itemName)) except Exception as e: logger.error('添加点位-{}-失败! [点位可能不存在]'.format(itemName), exc_info=True) item_name_map[itemName] = None else: item.IsActive = True time.sleep(0.1) item_name_map[itemName] = item else: logger.debug('重复添加 itemName - {}'.format(itemName)) def add_items(opcServer, item_list: List[str]): """批量添加点位""" for item in item_list: add_item(opcServer, item) def _sync_read(item): data = item.Read(win32com.client.constants.OPCDevice, 0, 0, 0) if not data: raise ValueError('sync_read return None.') return data def sync_read_item(opcServer, itemName: str): """ 同步读取一个 opc 点位 :param opcServer: :param itemName: :return: (数据, 数据质量, 时间) """ if itemName not in item_name_map: add_item(opcServer, itemName) logger.debug('读取 - {}'.format(itemName)) item = item_name_map[itemName] try: if not item: raise ValueError('点位不存') return _sync_read(item) except Exception as e: logger.error('采集失败: {} - {}'.format(itemName, e), exc_info=True) return None, 0, 0 def sync_read_items(opcServer, itemNameList: List[str]): result = [] for itemName in itemNameList: if itemName not in item_name_map: add_item(opcServer, itemName) result.append(sync_read_item(opcServer, itemName)) return result if __name__ == '__main__': opcServer = connect_opc('Kepware.KEPServerEX.V6', '127.0.0.1') print(get_servers(opcServer)) tag_list = ['T1.T1.Tag1', 'T1.T1.Tag2', 'T1.T1.Tag3'] try: while True: data = sync_read_items(opcServer, tag_list) for i in data: print(i[:2]) time.sleep(2) except Exception: logger.error('异常退出', exc_info=True) finally: opcServer.Disconnect()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。