当前位置:   article > 正文

【Python】FANUC机器人OPC UA通信并记录数据_fanuc机器人opc通信

fanuc机器人opc通信

引言

OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

机器人仿真

 FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer
在这里插入图片描述

环境准备

  • Python 3.5+
  • opcua库:用于实现OPC UA通信
  • logging库:用于记录日志

安装opcua库:

pip install opcua
  • 1

代码实现

1. 导入库

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2. 设置参数

SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
CSV_FILENAME = 'fanuc_opcua_data.csv'
FAUNC_LOG = 'fanuc.log'
LOG_DIR = 'log'
BACKUP_DIR = 'backup'
  • 1
  • 2
  • 3
  • 4
  • 5

3. 日志配置

def getLogger(filename: str):
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    logger = logging.Logger(filename[:-4].upper(), logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
    fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
    fh.setFormatter(formatter)
    ch = logging.StreamHandler()
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger
    
LOGGER = getLogger(FAUNC_LOG)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

4. OPC UA通信

  • 连接到服务器
def connect_to_server(url):
    client = Client(url)
    client.connect()
    return client
  • 1
  • 2
  • 3
  • 4
  • 获取根节点和对象节点
def get_root_node(client: Client):
    return client.get_root_node()
def get_objects_node(client: Client):
    return client.get_objects_node()
  • 1
  • 2
  • 3
  • 4
  • 遍历所有子节点并返回变量节点的路径和数值
def get_variables(node: Node, path=""):
    variables = {}
    children: List[Node] = node.get_children()
    for child in children:
        try:
            name: ua.QualifiedName = child.get_browse_name()
            new_path = f"{path}/{name.Name}"
            if child.get_node_class() == ua.NodeClass.Variable:
                value = child.get_value()
                if isinstance(value, list):
                    value = ','.join(str(x) for x in value)
                if isinstance(value, str):
                    value = value.replace('\n', '\\n').replace(',', ' ')
                variables[new_path] = value
            else:
                variables.update(get_variables(child, new_path))
        except Exception as e:
            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
    return variables
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

5. 备份旧CSV文件

def backup_csv_file(filename):
    if not os.path.exists(BACKUP_DIR):
        os.makedirs(BACKUP_DIR)
    if os.path.exists(filename):
        modification_time = os.path.getmtime(filename)
        modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
        try:
            shutil.move(filename, new_filename)
            LOGGER.info(f"文件已移动到 {new_filename}")
        except Exception as e:
            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

6. 主函数

if __name__ == "__main__":
    try:
        client = connect_to_server(SERVER_URL)
        root_node = get_root_node(client)
        objects_node = get_objects_node(client)
        backup_csv_file(CSV_FILENAME)
        with open(CSV_FILENAME, mode='w', newline='') as csvfile:
            num = 0
            while True:
                variables = get_variables(objects_node)
                if num == 1:
                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                    writer.writeheader()
                writer.writerow(variables)
                csvfile.flush()
                num += 1
                LOGGER.info("数据记录:" + str(num))
                time.sleep(1)
    except KeyboardInterrupt:
        print("程序被用户中断")
    finally:
        client.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

记录数据预览:
在这里插入图片描述

总结

 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


完整代码:

import csv
from datetime import datetime
import logging
import os
import shutil
import time
from typing import List
from opcua.common.node import Node
from opcua import Client, ua

# OPC UA服务器的URL
SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
# CSV文件名
CSV_FILENAME = "fanuc_opcua_data.csv"
# 日志文件
FAUNC_LOG = "fanuc.log"
# 文件夹
LOG_DIR = "log"
BACKUP_DIR = "backup"


def getLogger(filename: str):
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    logger = logging.Logger(filename[:-4].upper(), logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
    fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
    fh.setFormatter(formatter)
    ch = logging.StreamHandler()
    ch.setFormatter(formatter)
    logger.addHandler(fh)
    logger.addHandler(ch)
    return logger


LOGGER = getLogger(FAUNC_LOG)


def connect_to_server(url):
    """创建客户端实例并连接到服务端"""
    client = Client(url)
    client.connect()
    return client


def get_root_node(client: Client):
    """获取服务器命名空间中的根节点"""
    return client.get_root_node()


def get_objects_node(client: Client):
    """获取服务器的对象节点"""
    return client.get_objects_node()


def get_node_key(node: Node, path: str):
    node_info = node.nodeid.to_string()
    pairs = node_info.split(";")
    data = {}
    if len(pairs) <= 1:
        data = {"ns": 2, "s": node_info}
    else:
        for pair in pairs:
            key, value = pair.split("=")
            data[key] = value

    key = f"ns={data.get('ns', 2)};s={path}"
    if data.get("i"):
        key += f";i={data.get('i')}"
    return key


def get_variables(node: Node, path=""):
    """遍历所有子节点并返回变量节点的路径和数值"""
    variables = {}
    children: List[Node] = node.get_children()
    for child in children:
        try:
            name: ua.QualifiedName = child.get_browse_name()
            new_path = f"{path}/{name.Name}"
            if new_path.startswith("/Server") or new_path.startswith("/Modbus"):
                continue
            if child.get_node_class() == ua.NodeClass.Variable:
                value = child.get_value()
                if isinstance(value, list):
                    value = ",".join(str(x) for x in value)
                if isinstance(value, str):
                    value = value.replace("\n", "\\n").replace(",", " ")
                variables[get_node_key(child, new_path)] = value
            else:
                variables.update(get_variables(child, new_path))
        except Exception as e:
            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
    return variables


def backup_csv_file(filename):
    """如果CSV文件已存在则备份"""
    if not os.path.exists(BACKUP_DIR):
        os.makedirs(BACKUP_DIR)
    if os.path.exists(filename):
        modification_time = os.path.getmtime(filename)
        modification_time_str = datetime.fromtimestamp(modification_time).strftime(
            "%Y%m%d%H%M%S"
        )
        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
        try:
            shutil.move(filename, new_filename)
            LOGGER.info(f"文件已移动到 {new_filename}")
        except Exception as e:
            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")


if __name__ == "__main__":
    try:
        client = connect_to_server(SERVER_URL)
        root_node = get_root_node(client)
        objects_node = get_objects_node(client)

        backup_csv_file(CSV_FILENAME)

        with open(CSV_FILENAME, mode="w", newline="") as csvfile:
            num = 0

            while True:
                variables = get_variables(objects_node)

                if num < 1:
                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                    writer.writeheader()

                writer.writerow(variables)
                csvfile.flush()
                num += 1
                LOGGER.info("数据记录:" + str(num))
                time.sleep(1)

    except KeyboardInterrupt:
        print("程序被用户中断")
    finally:
        client.disconnect()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/518333
推荐阅读
相关标签
  

闽ICP备14008679号