赞
踩
OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。
FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
。
安装opcua库:
pip install opcua
import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'
def getLogger(filename: str):
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
logger = logging.Logger(filename[:-4].upper(), logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
fh.setFormatter(formatter)
ch = logging.StreamHandler()
ch.setFormatter(formatter)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
LOGGER = getLogger(FAUNC_LOG)
def connect_to_server(url):
client = Client(url)
client.connect()
return client
def get_root_node(client: Client):
return client.get_root_node()
def get_objects_node(client: Client):
return client.get_objects_node()
def get_variables(node: Node, path=""): variables = {} children: List[Node] = node.get_children() for child in children: try: name: ua.QualifiedName = child.get_browse_name() new_path = f"{path}/{name.Name}" if child.get_node_class() == ua.NodeClass.Variable: value = child.get_value() if isinstance(value, list): value = ','.join(str(x) for x in value) if isinstance(value, str): value = value.replace('\n', '\\n').replace(',', ' ') variables[new_path] = value else: variables.update(get_variables(child, new_path)) except Exception as e: LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}") return variables
def backup_csv_file(filename):
if not os.path.exists(BACKUP_DIR):
os.makedirs(BACKUP_DIR)
if os.path.exists(filename):
modification_time = os.path.getmtime(filename)
modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
try:
shutil.move(filename, new_filename)
LOGGER.info(f"文件已移动到 {new_filename}")
except Exception as e:
LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})
if __name__ == "__main__": try: client = connect_to_server(SERVER_URL) root_node = get_root_node(client) objects_node = get_objects_node(client) backup_csv_file(CSV_FILENAME) with open(CSV_FILENAME, mode='w', newline='') as csvfile: num = 0 while True: variables = get_variables(objects_node) if num == 1: writer = csv.DictWriter(csvfile, fieldnames=variables.keys()) writer.writeheader() writer.writerow(variables) csvfile.flush() num += 1 LOGGER.info("数据记录:" + str(num)) time.sleep(1) except KeyboardInterrupt: print("程序被用户中断") finally: client.disconnect()
记录数据预览:
本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA
完整代码:
import csv from datetime import datetime import logging import os import shutil import time from typing import List from opcua.common.node import Node from opcua import Client, ua # OPC UA服务器的URL SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer" # CSV文件名 CSV_FILENAME = "fanuc_opcua_data.csv" # 日志文件 FAUNC_LOG = "fanuc.log" # 文件夹 LOG_DIR = "log" BACKUP_DIR = "backup" def getLogger(filename: str): if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) logger = logging.Logger(filename[:-4].upper(), logging.INFO) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s") fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a") fh.setFormatter(formatter) ch = logging.StreamHandler() ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) return logger LOGGER = getLogger(FAUNC_LOG) def connect_to_server(url): """创建客户端实例并连接到服务端""" client = Client(url) client.connect() return client def get_root_node(client: Client): """获取服务器命名空间中的根节点""" return client.get_root_node() def get_objects_node(client: Client): """获取服务器的对象节点""" return client.get_objects_node() def get_node_key(node: Node, path: str): node_info = node.nodeid.to_string() pairs = node_info.split(";") data = {} if len(pairs) <= 1: data = {"ns": 2, "s": node_info} else: for pair in pairs: key, value = pair.split("=") data[key] = value key = f"ns={data.get('ns', 2)};s={path}" if data.get("i"): key += f";i={data.get('i')}" return key def get_variables(node: Node, path=""): """遍历所有子节点并返回变量节点的路径和数值""" variables = {} children: List[Node] = node.get_children() for child in children: try: name: ua.QualifiedName = child.get_browse_name() new_path = f"{path}/{name.Name}" if new_path.startswith("/Server") or new_path.startswith("/Modbus"): continue if child.get_node_class() == ua.NodeClass.Variable: value = child.get_value() if isinstance(value, list): value = ",".join(str(x) for x in value) if isinstance(value, str): value = value.replace("\n", "\\n").replace(",", " ") variables[get_node_key(child, new_path)] = value else: variables.update(get_variables(child, new_path)) except Exception as e: LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}") return variables def backup_csv_file(filename): """如果CSV文件已存在则备份""" if not os.path.exists(BACKUP_DIR): os.makedirs(BACKUP_DIR) if os.path.exists(filename): modification_time = os.path.getmtime(filename) modification_time_str = datetime.fromtimestamp(modification_time).strftime( "%Y%m%d%H%M%S" ) new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}" try: shutil.move(filename, new_filename) LOGGER.info(f"文件已移动到 {new_filename}") except Exception as e: LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}") if __name__ == "__main__": try: client = connect_to_server(SERVER_URL) root_node = get_root_node(client) objects_node = get_objects_node(client) backup_csv_file(CSV_FILENAME) with open(CSV_FILENAME, mode="w", newline="") as csvfile: num = 0 while True: variables = get_variables(objects_node) if num < 1: writer = csv.DictWriter(csvfile, fieldnames=variables.keys()) writer.writeheader() writer.writerow(variables) csvfile.flush() num += 1 LOGGER.info("数据记录:" + str(num)) time.sleep(1) except KeyboardInterrupt: print("程序被用户中断") finally: client.disconnect()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。