赞
踩
通过Python自动操作交换机
文件一:Main_Switch.py 操作示例
文件二:DEF_SSH_eNSP_Switch_S5700.py 通过SSH交互控制交换机完成任务
文件三:DEF_SNMP_eNSP_Switch_S5700.py 通过SNMP获取交换机信息
文件四:DEV_SSH.py 操作交换机的SSH驱动部分
文件五:DEV_SNMP.py 操作交换机SNMP驱动部分,目前这个需要Linux环境
文件一代码:
- #_*_ coding:utf8 _*_
- from DEF_SNMP_eNSP_Switch_S5700 import *
- from DEF_SSH_eNSP_Switch_S5700 import *
-
- import logging # 日志模块
- Log = logging.getLogger('__name__') # 获取实例
- formatter = logging.Formatter('%(asctime)s %(levelname)-8s: %(message)s') # 指定logger输出格式
- file_handler = logging.FileHandler('Main_Switch.log') # 日志文件路径
- file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
- Log.addHandler(file_handler) # 为logger添加的日志处理器
- # 设置记录的日志级别
- #Log.setLevel(logging.DEBUG)
- Log.setLevel(logging.INFO)
- #Log.setLevel(logging.WARNING)
- #Log.setLevel(logging.ERROR)
- #Log.setLevel(logging.CRITICAL)
-
-
-
- ## 测试 SNMP 功能
-
- '''
- ## SNMP 功能由于 DEV_SNMP.py 是根据Linux写的,需要Linux环境
- SNMP_PWD = 'pwd@123'
- SNMP_HOST = '192.168.56.2'
- SNMP_Ver = '2c'
- ## 查询设备IP和MAC绑定信息,生成字典形式
- R = SNMP_生成以IP为键以MAC地址列表为值的字典(SNMP_PWD, SNMP_HOST)
- if R[0] == 0:
- D_IP_MAC = R[1]
- INFO = '成功,IP和MAC绑定信息字典:' + str(D_IP_MAC)
- Log.info(INFO)
- print(INFO)
- else:
- ERROR = '失败,错误信息:'+ R[1]
- Log.error(ERROR)
- print(ERROR)
- ## 查询设备VLAN接口的IP地址信息,生成字典形式
- R = SNMP_生成以VLAN为键以VLAN_IP_MASK列表为值的字典(SNMP_PWD, SNMP_HOST)
- if R[0] == 0:
- D_VLAN_NET = R[1]
- INFO = '成功,VLAN接口和IP信息字典:' + str(D_VLAN_NET)
- Log.info(INFO)
- print(INFO)
- else:
- ERROR = '失败,错误信息:'+ R[1]
- Log.error(ERROR)
- print(ERROR)
- '''
-
-
-
-
- ## 测试 SSH 功能
-
- # 每台SSH设备的登录信息
- D_DEV0 = {}
- D_DEV0['SSH_USER'] = 'test'
- D_DEV0['SSH_PASS'] = 'pwd@123'
- D_DEV0['SSH_IP'] = '192.168.0.1'
- D_DEV0['SSH_PORT'] = 2001
-
- D_DEV1 = {}
- D_DEV1['SSH_USER'] = 'test'
- D_DEV1['SSH_PASS'] = 'pwd@123'
- D_DEV1['SSH_IP'] = '192.168.1.1'
-
- D_DEV2 = {}
- D_DEV2['SSH_USER'] = 'test'
- D_DEV2['SSH_PASS'] = 'pwd@123'
- D_DEV2['SSH_IP'] = '192.168.56.2'
-
-
- ## 全部SSH设备组成列表(顺序无所谓)
- L_DEVS = [D_DEV0, D_DEV1, D_DEV2]
-
-
- ## 制作选提示信息
- 设备列表 = '''
- 可以登录的设备列表
- '''
- for i in range(0, len(L_DEVS)):
- 设备列表 += '[ '+ str(i) + ' ] 选择 ' + L_DEVS[i]['SSH_IP'] + ' 设备进行操作\n'
- 设备列表 += '[ q ] 退出选择设备\n请选择设备:'
-
- #print(设备列表)
- '''
- 可以登录的设备列表
- [ 0 ] 选择 192.168.0.1 设备进行操作
- [ 1 ] 选择 192.168.1.1 设备进行操作
- [ 2 ] 选择 192.168.56.2 设备进行操作
- [ q ] 退出选择设备
- '''
-
-
- ## 制作业务功能选择列表
- 业务列表 = '''=== 功能菜单 ===
- [ 0 ] 保存配置
- [ 1 ] 查看全部IP和MAC绑定信息
- [ 2 ] 绑定指定IP和指定MAC
- [ 3 ] 在指定网段自动选择可用IP和指定MAC绑定
- [ 4 ] 查找MAC地址所在网段
- [ 5 ] 在MAC所在网段自动选择可用IP和进行绑定
- [ 6 ] 解除指定IP和指定MAC的绑定
- [ 7 ] 解除指定IP的全部MAC绑定
- [ 8 ] 解除指定MAC的全部IP绑定
- [ e ] 改变登录设备
- [ q ] 退出功能程序
- === 功能菜单 ===
- 请输入功能编号:'''
-
-
-
- ## 循环交互操作
- while 1:
- if '字典_登录信息' in locals().keys():
- print("【提示】当前已经缓存", 字典_登录信息['SSH_IP'], "登录信息")
- print()
- else:
- 目标设备 = input(设备列表)
- INDEX = [str(i) for i in range(0, len(L_DEVS))]
- if 目标设备 in INDEX:
- 字典_登录信息 = L_DEVS[int(目标设备)]
- print("【提示】当前已经选择", 字典_登录信息['SSH_IP'], "登录信息")
- print()
- elif 目标设备 == 'q':
- print("用户退出选择")
- break
- else:
- print("无效输入,请重新选择")
- print()
- while 1:
- 目标设备 = input(设备列表)
- INDEX = [str(i) for i in range(0, len(L_DEVS))]
- if 目标设备 in INDEX:
- 字典_登录信息 = L_DEVS[int(目标设备)]
- print("【提示】当前已经选择", 字典_登录信息['SSH_IP'], "登录信息")
- print()
- elif 目标设备 == 'q':
- print("用户退出选择")
- break
- break
-
- 业务选择 = input(业务列表)
- # 0 保存配置
- if 业务选择 == '0':
- T0 = time.time()
- print("正在执行 保存配置 请稍后")
- R = 保存配置(字典_登录信息)
- T1 = time.time()
- if R[0] == 0:
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- ERROR = R[1]
- Log.error(ERROR)
- print(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 1 查看全部IP和MAC绑定信息
- elif 业务选择 == '1':
- T0 = time.time()
- print("正在执行 查看全部IP和MAC绑定信息 请稍后")
- R = 获取IP和MAC绑定信息(字典_登录信息)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- D_IP_MAC = R[1]
- if D_IP_MAC == {}:
- print("当前无绑定记录")
- else:
- print("IP\t\t绑定MAC列表")
- for K in D_IP_MAC:
- print(K, "\t", D_IP_MAC[K])
- else:
- print("失败")
- print("失败原因", R[1])
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 2 绑定指定IP和指定MAC
- elif 业务选择 == '2':
- Bind_IP = input("输入要绑定的IP地址(如192.168.0.1):")
- Bind_MAC = input("要绑定的MAC地址(任意mac地址写法):")
- T0 = time.time()
- print("正在执行 绑定指定IP和指定MAC 请稍后")
- R = 绑定指定IP和指定MAC(字典_登录信息, Bind_IP, Bind_MAC)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- print("成功信息", R[1])
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- print("失败")
- print("失败原因", R[1])
- ERROR = R[1]
- Log.error(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 3在指定网段自动选择可用IP和指定MAC绑定
- elif 业务选择 == '3':
- Bind_NET = input("要绑定的网段(如192.168.0.1/24 或 192.168.0.1/255.255.255.0: ")
- Bind_MAC = input("要绑定的MAC地址(任意mac地址写法):")
- T0 = time.time()
- print("正在执行 在指定网段自动选择可用IP和指定MAC绑定 请稍后")
- R = 在指定网段自动选择可用IP和指定MAC绑定(字典_登录信息, Bind_NET, Bind_MAC)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- print("成功信息", R[1])
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- print("失败")
- print("失败原因", R[1])
- ERROR = R[1]
- Log.error(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 4 查找MAC地址所在网段
- elif 业务选择 == '4':
- MAC = input("要查找的MAC地址(任意mac地址写法):")
- T0 = time.time()
- print("正在执行 查找MAC地址所在网段 请稍后")
- R = 查找MAC地址所在网段(字典_登录信息, MAC)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- print("成功信息", R[1])
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- print("失败")
- print("失败原因", R[1])
- ERROR = R[1]
- Log.error(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 5 在MAC所在网段自动选择可用IP和进行绑定
- elif 业务选择 == '5':
- Bind_MAC = input("要绑定的MAC地址(任意mac地址写法):")
- T0 = time.time()
- print("正在执行 在MAC所在网段自动选择可用IP和进行绑定 请稍后")
- R = 在MAC所在网段自动选择可用IP和进行绑定(字典_登录信息, Bind_MAC)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- print("成功信息", R[1])
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- print("失败")
- print("失败原因", R[1])
- ERROR = R[1]
- Log.error(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 6 解除指定IP和指定MAC的绑定
- elif 业务选择 == '6':
- Undo_Bind_IP = input("要解除绑定的IP地址(如192.168.0.1):")
- Undo_Bind_MAC = input("要解除绑定的MAC地址(任意mac地址写法):")
- T0 = time.time()
- print("正在执行 解除指定IP和指定MAC的绑定 请稍后")
- R = 解除指定IP和指定MAC的绑定(字典_登录信息, Undo_Bind_IP, Undo_Bind_MAC)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- print("成功信息", R[1])
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- print("失败")
- print("失败原因", R[1])
- ERROR = R[1]
- Log.error(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 7 解除指定IP的全部MAC绑定
- elif 业务选择 == '7':
- Undo_Bind_IP = input("要解除绑定的IP地址(如192.168.0.1):")
- T0 = time.time()
- print("正在执行 解除指定IP的全部MAC绑定 请稍后")
- R = 解除指定IP的全部MAC绑定(字典_登录信息, Undo_Bind_IP)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- print("成功信息", R[1])
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- print("失败")
- print("失败原因", R[1])
- ERROR = R[1]
- Log.error(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- # 8 解除指定MAC的全部IP绑定
- elif 业务选择 == '8':
- Undo_Bind_MAC = input("要解除绑定的MAC地址(任意mac地址写法):")
- T0 = time.time()
- print("正在执行 解除指定MAC的全部IP绑定 请稍后")
- R = 解除指定MAC的全部IP绑定(字典_登录信息, Undo_Bind_MAC)
- T1 = time.time()
- if R[0] == 0:
- print("成功")
- print("成功信息", R[1])
- INFO = R[1]
- Log.info(INFO)
- print(INFO)
- else:
- print("失败")
- print("失败原因", R[1])
- ERROR = R[1]
- Log.error(ERROR)
- print("耗时 %.1f 秒" % (T1-T0))
- print()
- elif 业务选择 == 'e':
- print()
- print("【提示】修改登录设备")
- while 1:
- 目标设备 = input(设备列表)
- INDEX = [str(i) for i in range(0, len(L_DEVS))]
- if 目标设备 in INDEX:
- 字典_登录信息 = L_DEVS[int(目标设备)]
- print("【提示】当前重新选择", 字典_登录信息['SSH_IP'], "登录信息")
- print()
- break
- elif 目标设备 == 'q':
- print("用户退出选择")
- break
- else:
- print("无效输入,请重新选择")
- elif 业务选择 == 'q':
- print("用户正常退出程序")
- break
- else:
- print("无效选择,重新选择")
文件二代码:
- #_*_ coding:utf8 _*_
- from DEV_SSH import *
- import struct
- import re
- import logging # 日志模块
- import sys
-
- Log = logging.getLogger("__name__")
- if not logging.FileHandler:
- # 指定logger输出格式
- formatter = logging.Formatter('%(asctime)s %(levelname)-8s : %(message)s')
- # 文件日志
- file_handler = logging.FileHandler("log")
- file_handler = logging.FileHandler("DEF_eNSP_Switch_S5700.log") # 日志文件路径
- file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
- Log.addHandler(file_handler)
- Log.setLevel(logging.DEBUG)
-
-
-
-
-
- ###############################################################################################################################
- ## 检查IP是否合法
- ## 参数 IP_STR 点分十进制字符串,例:'192.168.0.1'
- def TEST_IP_STR(IP_STR):
- if type(IP_STR) == str:
- L_IP = IP_STR.split('.') ## 以点分割,各元素组成列表
- if len(L_IP) == 4:
- try: ## 尝试把4段内容转成数字
- IP_0 = int(L_IP[0])
- IP_1 = int(L_IP[1])
- IP_2 = int(L_IP[2])
- IP_3 = int(L_IP[3])
- except:
- ERROR = '函数 TEST_IP_STR() 参数 IP_STR 以点分成4段中有不能转成数字的部分'
- return(1, ERROR)
- else: ## 4段值各自转成数字成功后判断每个数字的取值范围
- if 0<= IP_0 <=255 and 0<= IP_1 <=255 and 0<= IP_2 <=255 and 0<= IP_3 <=255:
- INFO = '函数 TEST_IP_STR() 参数 IP_STR 检测合格,变量值 = ' + IP_STR
- return(0, INFO)
- else:
- ERROR = '函数 TEST_IP_STR() 参数 IP_STR 以点分成4段中有超出0-255范围的值'
- return(1, ERROR)
- else:
- ERROR = '函数 TEST_IP_STR() 参数 IP_STR 不能以点分成4段'
- return(1, ERROR)
- else:
- ERROR = '函数 TEST_IP_STR() 参数 IP_STR 类型不是字符串'
- return(1, ERROR)
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 检查登录信息字典
- ## 参数 D_LOGIN_INFO 帐号、密码、地址、端口组成的字典
- ## 如果无端口号设置为默认22端口(会修改字典)
- def TEST_D_LOGIN_INFO(D_LOGIN_INFO):
- if type(D_LOGIN_INFO) != dict:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 必须是字典类型'
- return(1, E)
-
- if 'SSH_USER' not in D_LOGIN_INFO:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_USER 不存在(没有登录帐号信息)'
- return(1, E)
- else:
- 登录帐号 = D_LOGIN_INFO['SSH_USER']
- if type(登录帐号) != str:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_USER 的值(登录帐号)不是字符串'
- return(1, E)
-
- if 'SSH_PASS' not in D_LOGIN_INFO:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_PASS 不存在(没有登录密码)'
- return(1, E)
- else:
- 登录密码 = D_LOGIN_INFO['SSH_PASS']
- if type(登录密码) != str:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_PASS 的值(登录密码)不是字符串'
- return(1, E)
-
- if 'SSH_IP' not in D_LOGIN_INFO:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_IP 不存在(没有登录IP地址)'
- return(1, E)
- else:
- IP_STR = D_LOGIN_INFO['SSH_IP']
- R = TEST_IP_STR(IP_STR) ## 调用 TEST_IP_STR() 检查IP是否合法
- if R[0] != 0:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_IP 的值(登录IP地址)IP格式错误 ' + R[1]
- return(1, E)
-
- if 'SSH_PORT' not in D_LOGIN_INFO:
- D_LOGIN_INFO['SSH_PORT'] = 22
- INFO = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_PORT 不存在(没有登录端口号)自动设置为默认22端口号'
- return(0, INFO) ## 修改为默认端口号后全部检查合格
- else:
- PORT = D_LOGIN_INFO['SSH_PORT']
- if type(PORT) != int:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_PORT 的值(登录端口号)不是数字'
- return(1, E)
- else:
- if 0< PORT <65535:
- INFO = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_PORT 的值(登录端口号)在(0,65535)区间内,检测合格,变量值 = ' + str(PORT)
- return(0, INFO) ## 全部检查合格
- else:
- E = '函数 TEST_D_LOGIN_INFO 参数 D_LOGIN_INFO 中 KEY SSH_PORT 的值(登录端口号)数值不在(0,65535)区间内'
- return(1, E)
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 尝试把掩码转成掩码位数
- ## MASK 参数类型:字符串('24' 或 '255.255.255.0' 样式)
- ## MASK 参数类型:数字(0到32)
- ## 成功返回 (0, 掩码位数(数字))
- ## 失败返回 (1, 错误信息)
- def TRY_MASK_2_MASK_INT(MASK):
- ## 字符串掩码格式和掩码长度对应的字典,方便快速查找
- D_MASK = {
- '0.0.0.0':0,
- '128.0.0.0':1,'192.0.0.0':2,'224.0.0.0':3,'240.0.0.0':4,'248.0.0.0':5,'252.0.0.0':6,'254.0.0.0':7,'255.0.0.0':8,
- '255.128.0.0':9,'255.192.0.0':10,'255.224.0.0':11,'255.240.0.0':12,'255.248.0.0':13,'255.252.0.0':14,'255.254.0.0':15,'255.255.0.0':16,
- '255.255.128.0':17,'255.255.192.0':18,'255.255.224.0':19,'255.255.240.0':20,'255.255.248.0':21,'255.255.252.0':22,'255.255.254.0':23,'255.255.255.0':24,
- '255.255.255.128':25,'255.255.255.192':26,'255.255.255.224':27,'255.255.255.240':28,'255.255.255.248':29,'255.255.255.252':30,'255.255.255.254':31,
- '255.255.255.255':32}
-
- if type(MASK) == int: ## 如果是数字类型,后续再判断数值是否在掩码范围内
- if 0 <= MASK <= 32: ## 如果掩码数值在0到32内
- return(0, MASK) ## 返回成功状态码0和掩码数值
- else:
- ERROR = 'ERROR 变量范围:掩码只能在0到32范围内'
- return(1, ERROR)
- elif type(MASK) == str: ## 如果是字符串类型,可能是数字掩码,可能是字符串类型的数字掩码
- try:
- ## 尝试把参数MASK转成数字类型
- MASK_INT = int(MASK) ## int() 函数可以自动删除字符串类型数字的前后空格
- except:
- ## 不能转换成数字可能是点分十格式掩码的字符串
- L_MASK = MASK.split('.') ## 以点分割,各元素组成列表
- if len(L_MASK) == 4: ## 如果能分成4段,说明是点分十格式掩码字符串
- if MASK in D_MASK: ## 如果这个点分十格式掩码字符串存在于掩码和掩码位数字典中
- MASK_INT = D_MASK[MASK] ## 获取点分十格式掩码字符串对应的掩码数值
- return(0, MASK_INT) ## 返回成功状态码0和掩码数值
- else:
- ERROR = 'ERROR 变量范围:掩码超出有效范围'
- return(1, ERROR)
- else: ## 如果不能分成4段,格式错误
- ERROR = 'ERROR 变量格式:掩码格式错误'
- return(1, ERROR)
- else: ## 可以转成数字
- RR = TRY_MASK_2_MASK_INT(MASK_INT) ## 调用自身函数处理新的数字类型参数
- return(RR) ## 返回自身处理结果给上级函数
- else:
- ERROR = 'ERROR 参数类型:只能是数字或字符串或字符串类型的数字'
- return(1, ERROR)
- ###############################################################################################################################
-
-
- ## IP_INT 转 IP_STR
- ## IP_INT(4字节无符号整数类型IP地址)
- ## IP_STR(点分十进制字符串类型IP地址)
- def IP_INT_2_IP_STR(IP_INT):
- IP_Byte = struct.pack('>I', IP_INT) # 打包成4字节无符号整数(大端/网络端)
- 元组_IP_数字 = struct.unpack('BBBB', IP_Byte)
- ## 每个数字转成字符串并加上'.'拼成点分十形式字符串
- #IP_STR = str(元组_IP_数字[0]) +'.'+ str(元组_IP_数字[1]) +'.'+ str(元组_IP_数字[2]) +'.'+ str(元组_IP_数字[3]) ## Python2用这个写法
- IP_STR = f'{str(元组_IP_数字[0])}.{str(元组_IP_数字[1])}.{str(元组_IP_数字[2])}.{str(元组_IP_数字[3])}' ## Python3可用的新写法
- return(IP_STR)
-
-
- ## IP_STR 转 IP_INT
- ## IP_STR(点分十进制字符串类型IP地址)
- ## IP_INT(4字节无符号整数类型IP地址)
- def IP_STR_2_IP_INT(IP_STR):
- IP分段 = IP_STR.split('.')
- B_IP = struct.pack('BBBB', int(IP分段[0]), int(IP分段[1]), int(IP分段[2]), int(IP分段[3])) # 4个字节数字按顺序拼成4字节字节码
- T_IP_INT = struct.unpack('>I', B_IP) # 把拼成的4字节字节码转成大端格式的4字节无符号整数
- IP_INT = T_IP_INT[0] # 打包成字节返回是元组,第一个元素是打包的结果
- return(IP_INT)
-
-
- ## 根据地址和掩码计算起始和结束地址
- ## 返回(NET_IP_MIN, NET_IP_MAX)
- ## NET_IP_MIN(网段首地址/网段号)
- ## NET_IP_MAX(网段末地址/广播号)
- def 根据地址和掩码计算起始和结束地址(IP_STR, MASK_INT):
- 主机位数 = 32 - MASK_INT
- 主机数量 = 2**主机位数
- IP_INT = IP_STR_2_IP_INT(IP_STR)
- 位移操作缓存 = IP_INT >> 主机位数
- IP_NET_INT = 位移操作缓存 << 主机位数
- NET_IP_MIN = IP_NET_INT ## 每个网段的首IP地址为网段号
- NET_IP_MAX = IP_NET_INT + 主机数量 -1 ## 每个网段的末IP地址为广播号(以0为第一个,最后一个要总数-1)
- return(NET_IP_MIN, NET_IP_MAX)
-
-
- ## 根据IP地址和掩码位数生成IP所在网段的全部IP_INT地址列表(列表从小到大顺序)
- ## IP_INT(4字节无符号整数类型IP地址)
- ## IP_STR(点分十进制字符串类型IP地址)
- ## MASK_INT(掩码长度数字,范围0-32)
- def IP_MASK_2_IP_INT_LIST(IP_STR, MASK_INT):
- 主机位数 = 32 - MASK_INT ## 32位的IPv4地址由网络位部分和主机位部分组成
- 主机数量 = 2**主机位数 ## 主机数量由主机位的比特个数可以组成数字的全部可能组合
- IP_INT = IP_STR_2_IP_INT(IP_STR) ## (点分十进制字符串类型IP地址)转成(4字节无符号整数类型IP地址)
- 位移操作缓存 = IP_INT >> 主机位数 ## 先往右移主机个数位,清除主机位上的值
- IP_NET_INT = 位移操作缓存 << 主机位数 ## 再往左移主机个数位,填充主机位上的值为0,得到IP的所在段的网络号(网段中第一个IP地址)对应的4字节数字值
- IP_INT_LIST = [i+IP_NET_INT for i in range(0,主机数量)] ## 生成IP所在网段的全部IP地址
- return(IP_INT_LIST) ## 返回网段全部IP地址
-
-
- ## 根据IP地址和掩码位数生成IP所在网段的全部IP_STR地址列表(列表从小到大顺序)
- ## IP_STR(点分十进制字符串类型IP地址)
- ## MASK_INT(掩码长度数字,范围0-32)
- def IP_MASK_2_IP_STR_LIST(IP_STR, MASK_INT):
- 主机位数 = 32 - MASK_INT
- 主机数量 = 2**主机位数
- IP_INT = IP_STR_2_IP_INT(IP_STR)
- 位移操作缓存 = IP_INT >> 主机位数
- IP_NET_INT = 位移操作缓存 << 主机位数
- IP_STR_LIST = []
- for i in range(0,主机数量):
- R = IP_INT_2_IP_STR(IP_NET_INT + i) ## IP的数字值转成IP点分十格式字符串
- IP_STR_LIST.append(R)
- return(IP_STR_LIST)
-
-
- ##################################################################################
- ## 功能函数 MAC缓存信息转成MAC对应VLAN字典()
- ## 把如下字符串内容
- ## <Huawei>display mac-address
- ## MAC address table of slot 0:
- ## -------------------------------------------------------------------------------
- ## MAC Address VLAN/ PEVLAN CEVLAN Port Type LSP/LSR-ID
- ## VSI/SI MAC-Tunnel
- ## -------------------------------------------------------------------------------
- ## 0a00-2700-0010 1 - - GE0/0/1 dynamic 0/-
- ## 5489-9836-1256 1 - - GE0/0/2 dynamic 0/-
- ## 5489-9836-1256 2 - - GE0/0/3 dynamic 0/-(缓存过期前电脑移动到别的vlan会多出这样的缓存记录)
- ## -------------------------------------------------------------------------------
- ## Total matching items on slot 0 displayed = 2
- ## <Huawei>
- ## 转换成字典 {'0a00-2700-0010': ['1'], '5489-9836-1256': ['1', '2']}
- ##################################################################################
- def MAC缓存信息转成MAC对应VLAN字典(S_DATA):
- D_MAC_VLAN = {}
- MAC表达式 = '[a-f0-9A-F]{4}\-[a-f0-9A-F]{4}\-[a-f0-9A-F]{4}'
- MAC缓存记录表达式 = MAC表达式 + '(.*)'
- RE_F = re.finditer(MAC缓存记录表达式, S_DATA)
- 列表_MAC缓存信息 = [i.group() for i in RE_F]
- #print("列表_MAC缓存信息", 列表_MAC缓存信息)
- if 列表_MAC缓存信息 != []:
- for i in 列表_MAC缓存信息:
- L = i.split()
- #print("L", L)
- if len(L) > 1: # 能分成2段及以上
- MAC = L[0] # 字符串类型MAC地址
- VLAN = L[1] # 字符串类型的数字(VLAN号)
- #print("MAC", MAC, "VLAN", VLAN)
- if MAC not in D_MAC_VLAN:
- D_MAC_VLAN[MAC] = [VLAN]
- else:
- if VLAN not in D_MAC_VLAN[MAC]:
- D_MAC_VLAN[MAC].append(VLAN)
- #else:
- # pass
- return(D_MAC_VLAN)
-
-
- ################################################################################
- ## 功能函数 VLAN和NET字典()
- ## 把如下字符串内容
- ## <Huawei>display ip interface brief
- ## Interface IP Address/Mask Physical Protocol
- ## MEth0/0/1 unassigned down down
- ## NULL0 unassigned up up(s)
- ## Vlanif10 192.168.10.1/24 down down
- ## Vlanif20 192.168.20.1/24 down down
- ## <Huawei>
- ## 转换成字典 {'Vlanif10': ['192.168.10.1/24'], 'Vlanif20': ['192.168.20.1/24']}
- ################################################################################
- def VLAN和NET字典(S_DATA):
- D_VLAN_IP_MASK = {}
- IP表达式 = '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+/[0-9]+' # 数字.数字.数字.数字/数字
- 表达式 = '(.*)Vlanif(.*)' + IP表达式
- RE_F = re.finditer(表达式, S_DATA)
- 列表_VLAN_NET = [i.group() for i in RE_F] # 每条匹配内容做成列表形式
- #print("列表_VLAN_NET", 列表_VLAN_NET)
-
- if 列表_VLAN_NET != []:
- for i in 列表_VLAN_NET:
- L = i.split()
- #print("L", L)
- if len(L) > 1: # 能分成2段及以上
- VLAN = L[0] # 字符串类型MAC地址
- NET = L[1] # 字符串类型的数字(VLAN号)
- #print("VLAN", VLAN, "NET", NET)
- if VLAN not in D_VLAN_IP_MASK:
- D_VLAN_IP_MASK[VLAN] = [NET]
- else:
- D_VLAN_IP_MASK[VLAN].append(NET)
- return(D_VLAN_IP_MASK)
-
-
- ###############################################################################
- ## 功能函数 VLAN和IP字典()
- ## 把如下字符串内容
- ## <Huawei>display ip interface brief
- ## Interface IP Address/Mask Physical Protocol
- ## MEth0/0/1 unassigned down down
- ## NULL0 unassigned up up(s)
- ## Vlanif10 192.168.10.1/24 down down
- ## Vlanif20 192.168.20.1/24 down down
- ## <Huawei>
- ## 转换成字典 {'Vlanif10': ['192.168.10.1'], 'Vlanif20': ['192.168.20.1']}
- ###############################################################################
- def VLAN和IP字典(S_DATA):
- D_VLAN_IP = {}
- IP表达式 = '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' # 数字.数字.数字.数字
- 表达式 = '(.*)Vlanif(.*)' + IP表达式
- RE_F = re.finditer(表达式, S_DATA)
- 列表_VLAN_IP = [i.group() for i in RE_F] # 每条匹配内容做成列表形式
- #print("列表_VLAN_IP", 列表_VLAN_IP)
-
- if 列表_VLAN_IP != []:
- for i in 列表_VLAN_IP:
- L = i.split()
- #print("L", L)
- if len(L) > 1: # 能分成2段及以上
- VLAN = L[0] # 字符串类型MAC地址
- IP = L[1] # 字符串类型的数字(VLAN号)
- #print("VLAN", VLAN, "IP", IP)
- if VLAN not in D_VLAN_IP:
- D_VLAN_IP[VLAN] = [IP]
- else:
- D_VLAN_IP[VLAN].append(IP)
- return(D_VLAN_IP)
-
-
- ###############################################################################################################################
- ## 尝试把网段表示格式:'IP/MASK' 格式转成元组 (IP_STR, MASK_INT)
- ## 参数 NET_STR 网段表示格式(字符串)例:'192.168.0.0/255.255.255.0' 或 '192.168.0.0/24'
- ## 成功返回 (0, (IP_STR, MASK_INT))
- ## 把 192.168.0.1/24 拆成 '192.168.0.1' 和 24
- ## 把 192.168.0.1/255.255.255.0 拆成 '192.168.0.1' 和 24
- ## 失败返回 (1, 错误信息)
- def TRY_NET_STR_2_IP_MASK(NET_STR):
- if type(NET_STR) == str:
- L_IP_MASK = NET_STR.split('/')
- if len(L_IP_MASK) == 2:
- IP_STR = L_IP_MASK[0]
- MASK_STR = L_IP_MASK[1]
- #print("IP_STR", IP_STR, type(IP_STR))
- #print("MASK_STR", MASK_STR, type(MASK_STR))
- TEST_IP = TEST_IP_STR(IP_STR) ## 判断IP是否正确
- R_TRY_MASK = TRY_MASK_2_MASK_INT(MASK_STR) ## 判断并处理掩码
- if TEST_IP[0] == R_TRY_MASK[0] == 0: ## 当IP和MASK全部合法时计算可用IP地址
- MASK_INT = R_TRY_MASK[1]
- return(0, (IP_STR, MASK_INT))
- else:
- ERROR = 'ERROR 参数格式(字符串):'
- if TEST_IP[0] == 1:
- ERROR += 'IP格式有错误:' + TEST_IP[1]
- if R_TRY_MASK[0] == 1:
- ERROR += '掩码格式有错误:' + R_TRY_MASK[1]
- return(1, ERROR)
- else:
- ERROR = 'ERROR 参数格式(字符串):不能以"/"符号分成2段'
- return(1, ERROR)
- else:
- ERROR = 'ERROR 参数类型:必须是字符串'
- return(1, ERROR)
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 尝试把MAC地址转成指定格式
- ## 参数 MAC_STR 字符串类型的MAC地址,只要含有12个十六进制字符即可
- ## 参数 转换格式 把传入的MAC地址形式转成指定样式,参数默认值为网络设备常用样式:'xxxx-xxxx-xxxx'
- def TRY_MAC_2_MAC(MAC_STR, 转换格式='xxxx-xxxx-xxxx'):
- if type(MAC_STR) == str:
- MAC_STR = re.sub('[^a-fA-F0-9]', '', MAC_STR) # 先删除不是16进制范围的字符
- if len(MAC_STR) == 12: # 判断新的 mac 长度是否正常
- if 转换格式 == 'xxxx-xxxx-xxxx':
- mac = MAC_STR.lower()
- mac_new = mac[0:4] + '-' + mac[4:8] + '-' + mac[8:12] # 改成新格式 xxxx-xxxx-xxxx
- return(0, mac_new)
- elif 转换格式 == 'XXXX-XXXX-XXXX':
- MAC = MAC_STR.upper()
- MAC_NEW = MAC[0:4] + '-' + MAC[4:8] + '-' + MAC[8:12] # 改成新格式 XXXX-XXXX-XXXX
- return(0, MAC_NEW)
- elif 转换格式 == 'xx-xx-xx-xx-xx-xx':
- mac = MAC_STR.lower()
- mac_new = mac[0:2] + '-' + mac[2:4] + '-' + mac[4:6] + '-' + mac[6:8] + '-' + mac[8:10] + '-' + mac[10:12] # 改成新格式 xx-xx-xx-xx-xx-xx
- return(0, mac_new)
- elif 转换格式 == 'XX-XX-XX-XX-XX-XX':
- MAC = MAC_STR.upper()
- MAC_NEW = MAC[0:2] + '-' + MAC[2:4] + '-' + MAC[4:6] + '-' + MAC[6:8] + '-' + MAC[8:10] + '-' + MAC[10:12] # 改成新格式 XX-XX-XX-XX-XX-XX
- return(0, MAC_NEW)
- elif 转换格式 == 'xx:xx:xx:xx:xx:xx':
- mac = MAC_STR.lower()
- mac_new = mac[0:2] + ':' + mac[2:4] + ':' + mac[4:6] + ':' + mac[6:8] + ':' + mac[8:10] + ':' + mac[10:12] # 改成新格式 xx:xx:xx:xx:xx:xx
- return(0, mac_new)
- elif 转换格式 == 'XX:XX:XX:XX:XX:XX':
- MAC = MAC_STR.upper()
- MAC_NEW = MAC[0:2] + ':' + MAC[2:4] + ':' + MAC[4:6] + ':' + MAC[6:8] + ':' + MAC[8:10] + ':' + MAC[10:12] # 改成新格式 XX:XX:XX:XX:XX:XX
- return(0, MAC_NEW)
- else:
- ERROR = '函数 TRY_MAC_2_MAC() 参数 转换格式 的形式未定义'
- return(1, ERROR)
- else:
- ERROR = '函数 TRY_MAC_2_MAC() 参数 MAC_STR 不是有效MAC地址: ' + MAC_STR
- return(1, ERROR)
- else:
- ERROR = '函数 TRY_MAC_2_MAC() 参数 MAC_STR 需要是字符串类型,当前类型是:' + str(type(MAC_STR))
- return(1, ERROR)
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 功能函数:查看交互命令行当前所在模式,如果不是查看模式,执行quit命令直到返回查看模式
- ## 参数 S_DATA 执行交互命令后的回显内容
- ## 参数 SHELL 交互操作子SHELL对象
- ## 参数 最大深度 执行quit命令的最大次数
- def 返回查看模式(S_DATA, SHELL, 最大深度=5):
- if 最大深度 > 0:
- L_S_DATA = S_DATA.split() # 以空格分隔回显内容
- Log.debug('函数 返回查看模式() 的 L_S_DATA 变量值 = ' + str(L_S_DATA))
- 最后内容 = L_S_DATA[-1] # 取最后一段,一般为等待命令的提示符(交换机终端回显日志会有干扰,交换机设置禁止终端打印日志功能)
- 查看模式 = re.search('<(.*)>', 最后内容) # 尝试查找<>结构
- 配置模式 = re.search('\[(.*)\]', 最后内容) # 尝试查找[]结构
- if 配置模式:
- CMD = 'quit'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- Log.debug('判断是配置模式,执行quit命令,回显内容:' + S_DATA)
- 最大深度 = 最大深度 -1
- R = 返回查看模式(S_DATA, SHELL, 最大深度)
- return(R)
- elif 查看模式:
- INFO = '已经返回到<查看模式>'
- return(0,INFO)
- else:
- ERROR = '最后一行找不到 <> 或 [] 符号'
- return(1,ERROR)
- else:
- ERROR = '已经退出最大深度还没有返回到<查看模式>放弃'
- return(1,ERROR)
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 功能函数:从查询静态绑定信息命令回显中提取以IP为KEY以MAC为值的字典
- ## 参数 S_DATA 执行交互命令后的回显内容
- ## 把如下字符串内容
- ## dis cu | include user-bind static
- ## user-bind static ip-address 1.1.1.1 mac-address 0000-1111-2222
- ## user-bind static ip-address 1.1.1.1 mac-address 0000-0000-0001
- ## user-bind static ip-address 1.1.1.2 mac-address 0000-0000-0002
- ## <Huawei>
- ## 转换成字典 {'1.1.1.1': ['0000-1111-2222', '0000-0000-0001'], '1.1.1.2': ['0000-0000-0002']}
- def IP_MAC_绑定信息转成IP为KEY字典(S_DATA):
- D_IP_MAC = {} # 存放IP为键MAC为值的字典 示例 {'IP1':['MAC'], 'IP2':['MAC1', 'MAC2']}
-
- ## 从命令回显中过滤出纯IP和MAC的绑定记录,并转成列表形式
- 表达式 = '(.*)ip-address(.*)mac-address(.*)' # 匹配绑定记录的正则表达式
- RE_F = re.finditer(表达式, S_DATA) # 找到所有匹配的内容(每条绑定记录)
- 列表_绑定信息 = [i.group() for i in RE_F] # 每条匹配内容做成列表形式
- #print("列表_绑定信息", 列表_绑定信息)
-
- ## 定位IP和MAC各自在绑定记录的位置
- if 列表_绑定信息 != []:
- 首条记录 = 列表_绑定信息[0] # 提取第一条绑定记录
- 列表_首条记录分割 = 首条记录.split() # 第一条绑定记录以空格分段变成列表
- IP表达式 = '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' # 匹配 x.x.x.x 类型IP地址的正则表达式
- MAC表达式 = '[a-f0-9A-F]{4}\-[a-f0-9A-F]{4}\-[a-f0-9A-F]{4}' # 匹配 xxxx-xxxx-xxxx 类型MAC地址的正则表达式
- IP_位置 = '' # 准备存放 IP 下标值
- MAC_位置 = '' # 准备存放 MAC 下标值
- for i in range(0, len(列表_首条记录分割)): # 遍历首条绑定记录每一段内容
- RE_IP = re.search(IP表达式, 列表_首条记录分割[i]) # 尝试在当前段中匹配IP地址
- RE_MAC = re.search(MAC表达式, 列表_首条记录分割[i]) # 尝试在当前段中匹配MAC地址
- if RE_IP: # 如果在当前段中匹配到IP地址
- #print(i, "is IP:", 列表_首条记录分割[i])
- IP_位置 = i # 设置'IP_位置'变量值为当前段下标值(数字int类型)
- if RE_MAC: # 如果在当前段中匹配到MAC地址
- #print(i, "is MAC:", 列表_首条记录分割[i])
- MAC_位置 = i # 设置'MAC_位置'变量值为当前段下标值(数字int类型)
-
- ## 逐行提取IP和MAC的值,做成字典形式
- if IP_位置 != '' and MAC_位置 != '': # 当'IP_位置'和'MAC_位置'都找到对应下标时
- for i in 列表_绑定信息: # 遍历每一条绑定记录
- #print("每行内容", i)
- SP = i.split() # 绑定记录以空格分段成列表
- #print("SP", SP)
- IP = SP[IP_位置] # 根据已知下标提取IP的值
- MAC = SP[MAC_位置] # 根据已知下标提取MAC的值
- if IP not in D_IP_MAC: # 如果当前IP不在D_IP_MAC字典中
- D_IP_MAC[IP] = [MAC] # 向D_IP_MAC字典中添加此对键值对,MAC做成列表形式
- else: # 如果当前IP已经存在D_IP_MAC字典中
- D_IP_MAC[IP].append(MAC) # 把MAC添加到对应键值对中
- #print("D_IP_MAC", D_IP_MAC)
- return(D_IP_MAC) # 返回制作好的字典,如果没有绑定记录也会返回空字典
- else: # 当'IP_位置'和'MAC_位置'任意一个没有找到对应下标时
- return(D_IP_MAC) # 直接返回空字典
- else:
- return(D_IP_MAC) # 回显结果中没有找到任何IP和MAC的绑定记录,直接返回空字典
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 功能函数:从查询静态绑定信息命令回显中提取以MAC为KEY以IP为值的字典
- ## 参数 S_DATA 执行交互命令后的回显内容
- ## 把如下字符串内容
- ## dis cu | include user-bind static
- ## user-bind static ip-address 1.1.1.1 mac-address 0000-1111-2222
- ## user-bind static ip-address 1.1.1.1 mac-address 0000-0000-0001
- ## user-bind static ip-address 1.1.1.2 mac-address 0000-0000-0002
- ## <Huawei>
- ## 转换成字典 {'0000-1111-2222': ['1.1.1.1'], '0000-0000-0001': ['1.1.1.1'], '0000-0000-0002': ['1.1.1.2']}
- def IP_MAC_绑定信息转成MAC为KEY字典(S_DATA):
- D_MAC_IP = {}
-
- 表达式 = '(.*)ip-address(.*)mac-address(.*)'
- RE_F = re.finditer(表达式, S_DATA)
- 列表_绑定信息 = [i.group() for i in RE_F]
- #print("列表_绑定信息", 列表_绑定信息)
-
- if 列表_绑定信息 != []:
- 首条记录 = 列表_绑定信息[0]
- 列表_首条记录分割 = 首条记录.split()
- IP表达式 = '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+'
- MAC表达式 = '[a-f0-9A-F]{4}\-[a-f0-9A-F]{4}\-[a-f0-9A-F]{4}'
- IP_位置 = ''
- MAC_位置 = ''
- for i in range(0, len(列表_首条记录分割)):
- RE_IP = re.search(IP表达式, 列表_首条记录分割[i])
- RE_MAC = re.search(MAC表达式, 列表_首条记录分割[i])
- if RE_IP:
- #print(i, "is IP:", 列表_首条记录分割[i])
- IP_位置 = i
- if RE_MAC:
- #print(i, "is MAC:", 列表_首条记录分割[i])
- MAC_位置 = i
-
- if IP_位置 != '' and MAC_位置 != '':
- for i in 列表_绑定信息:
- #print("每行内容", i)
- SP = i.split()
- #print("SP", SP)
- IP = SP[IP_位置]
- MAC = SP[MAC_位置]
- if MAC not in D_MAC_IP:
- D_MAC_IP[MAC] = [IP]
- else:
- D_MAC_IP[MAC].append(IP)
- #print("D_MAC_IP", D_MAC_IP)
- return(D_MAC_IP)
- else:
- return(D_MAC_IP)
- else:
- return(D_MAC_IP)
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 功能函数:在IP和MAC对应字典中是否出现IP和MAC绑定记录
- ## 参数 D 有IP和MAC对应关系的字典,可以是 D_IP_MAC 或 D_MAC_IP
- ## 参数 IP 点分十进制字符串类型IP地址
- ## 参数 MAC 连字符十六进制字符串类型MAC地址
- def 判断是否绑定成功(D, IP, MAC):
- if D == {}:
- E = '绑定失败:绑定记录是空字典'
- return(1,E)
-
- if IP in D: # 是以IP为key的字典格式
- if MAC in D[IP]:
- INFO = '绑定成功,当前 ' + IP + ' 的绑定MAC信息列表 ' + str(D[IP])
- return(0, INFO)
- else:
- ERROR = '绑定失败,当前 ' + IP + ' 的绑定MAC信息列表 ' + str(D[IP])
- return(1, ERROR)
- elif MAC in D: # 是以MAC为key的字典格式
- if IP in D[MAC]:
- INFO = '绑定成功,当前 ' + MAC + ' 的绑定IP信息列表 ' + str(D[MAC])
- return(0,INFO)
- else:
- ERROR = '绑定失败,当前 ' + MAC + ' 的绑定IP信息列表 ' + str(D[MAC])
- return(1, ERROR)
- else: # 没有IP或MAC的绑定记录
- ERROR = '绑定失败,没有 ' + IP + ' 或 ' + MAC + ' 的任何的绑定记录'
- return(1, ERROR)
- ###############################################################################################################################
-
-
- ###############################################################################################################################
- ## 功能函数 自动查找网段中可用IP_优先自增()
- ## 不填充中间有被删除绑定的IP地址空缺位置
- ## 新MAC分配的IP优先使用已经绑定的最大IP地址的后一个IP地址
- ## 在达到自动分配可用的最大IP后再填充中间空缺IP位置(从小开始填充)
- ###############################################################################################################################
- def 自动查找网段中可用IP_优先自增(NET_IP_STR, NET_MASK_INT, S_DATA, 网段开头保留地址数量=1, 网段末尾保留地址数量=1):
- ## 检查参数是否合法
- if 网段开头保留地址数量 < 0:
- ERROR = '函数参数 网段开头保留地址数量 小于0'
- return(1, ERROR)
- if 网段末尾保留地址数量 < 0:
- ERROR = '函数参数 网段末尾保留地址数量 小于0'
- return(1, ERROR)
- ## 计算可用IP地址范围及判断是否还有可自动分配IP地址
- NET_IP_MIN, NET_IP_MAX = 根据地址和掩码计算起始和结束地址(NET_IP_STR, NET_MASK_INT) ## 网段首末地址:网段号和广播号
- 可自动分配地址最小值 = NET_IP_MIN + 1 + 网段开头保留地址数量 ## 起始IP +1 排除网络号,再加上网段开头保留地址数量
- 可自动分配地址最大值 = NET_IP_MAX - 1 - 网段末尾保留地址数量 ## 结尾IP -1 避开广播号,再减去网段末尾保留地址数量
- #print("可自动分配地址最小值", 可自动分配地址最小值, IP_INT_2_IP_STR(可自动分配地址最小值))
- #print("可自动分配地址最大值", 可自动分配地址最大值, IP_INT_2_IP_STR(可自动分配地址最大值))
- Log.debug('可自动分配地址最小值:' + str(可自动分配地址最小值) + ' ' + IP_INT_2_IP_STR(可自动分配地址最小值))
- Log.debug('可自动分配地址最大值:' + str(可自动分配地址最大值) + ' ' + IP_INT_2_IP_STR(可自动分配地址最大值))
- if 可自动分配地址最小值 > 可自动分配地址最大值:
- ERROR = '没有可自动分配地址:保留地址过多 或 网段太小'
- return(1, ERROR)
-
- ## 开始查找可用IP地址
- 可用_IP_STR = ''
-
- ## 从 S_DATA 绑定信息中提取全部的IP地址
- D_IP_MAC = IP_MAC_绑定信息转成IP为KEY字典(S_DATA)
- L_BIND_IP_STR = [i for i in D_IP_MAC] ## 列表_已绑定的全部IP地址(字符形式)
- #print("L_BIND_IP_STR 已绑定使用地址", L_BIND_IP_STR)
- Log.debug('已绑定使用地址(IP_STR):' + str(L_BIND_IP_STR))
- L_BIND_IP_INT = [IP_STR_2_IP_INT(i) for i in L_BIND_IP_STR] ## 列表_已绑定的全部IP地址(数字形式)
- #print("L_BIND_IP_INT 已绑定使用地址", L_BIND_IP_INT)
- Log.debug('已绑定使用地址(IP_INT):' + str(L_BIND_IP_INT))
-
- ## 自动分配网段可用地址中已经被占用的地址:在已绑定的全部IP地址中有多少个是自动分配网段中可以使用的IP地址
- ## 如果没有,那自动分配网段中的全部地址都可以使用,直接取最小的用
- ## 如果有,
- 列表_网段已被占用地址 = [i for i in L_BIND_IP_INT if 可自动分配地址最小值<=i<=可自动分配地址最大值]
- #print("列表_网段已被占用地址", 列表_网段已被占用地址)
- Log.debug('列表_网段已被占用地址:' + str(列表_网段已被占用地址))
- if 列表_网段已被占用地址 == []:
- #print("网段", NET_IP_STR, NET_MASK_INT, "暂无绑定记录,直接使用网段中可自动分配地址的最小IP地址")
- Log.debug('网段' + NET_IP_STR +'/'+ str(NET_MASK_INT) + '暂无绑定记录,直接使用网段中可自动分配地址的最小IP地址')
- 可用_IP_INT = 可自动分配地址最小值 ## 直接使用网段中可自动分配地址的最小IP地址
- 可用_IP_STR = IP_INT_2_IP_STR(可用_IP_INT) ## 数字型IP地址转成字符型IP地址
- return(0, 可用_IP_STR)
- else:
- 网段已被占用地址_MAX = max(列表_网段已被占用地址) ## 提取网段中已经使用的最大IP值
- if 网段已被占用地址_MAX +1 <= 可自动分配地址最大值: ## 如果网段中已经使用的最大IP值增加1还自动分配可用地址范围内
- 可用_IP_INT = 网段已被占用地址_MAX +1 ## 可用IP就取网段中已经使用的最大IP的后一个IP地址
- 可用_IP_STR = IP_INT_2_IP_STR(可用_IP_INT)
- #print("使用已经绑定IP的后一个IP地址", 可用_IP_STR)
- Log.debug('使用已经绑定IP的后一个IP地址:' + 可用_IP_STR)
- return(0, 可用_IP_STR)
- else: ## 否则表示已经到顶,返回去看看有没有解绑掉的空缺IP地址
- #print("自增方式自动分配IP已经到顶,开始从头查看是否有已经解绑的空缺IP地址")
- 集合_网段已被占用地址 = set(列表_网段已被占用地址) ## 转成集合判断元素是否存在会快一点
- #print("集合_网段已被占用地址", 集合_网段已被占用地址)
- for i in range(可自动分配地址最小值, 可自动分配地址最大值+1): ## 从小到大遍历自动分配可用地址
- #print("遍历可网段中可自动分配IP地址", i, IP_INT_2_IP_STR(i))
- if i not in 集合_网段已被占用地址: ## 如果遍历到的这个IP地址不在已经绑定地址集合中
- 可用_IP_STR = IP_INT_2_IP_STR(i) ## 遍历到的这个IP地址就是可用地址,转成字符形式IP地址
- #print("找到空缺可用IP地址", 可用_IP_STR)
- Log.debug('找到空缺可用IP地址:' + 可用_IP_STR)
- break ## 找到了就终止循环
- ## 判断捡漏查找结果
- if 可用_IP_STR == '':
- ERROR = '无可用地址:网段可自动分配区间地址已经全部被使用,可以尝试扩大网段或减少保留地址数量'
- return(1, ERROR)
- else:
- return(0, 可用_IP_STR)
-
-
- ###############################################################################################################################
- ## 功能函数 自动查找网段中可用IP_自动填充()
- ## 在已用IP范围外和自动分配地址范围内找到最小可用IP
- ## 会填充自动分配地址范围内有被删除IP的空缺位置
- ###################################################
- def 自动查找网段中可用IP_自动填充(NET_IP_STR, NET_MASK_INT, S_DATA, 网段开头保留地址数量=1, 网段末尾保留地址数量=1):
- ## 检查参数是否合法
- if 网段开头保留地址数量 < 0:
- ERROR = '函数参数 网段开头保留地址数量 小于0'
- return(1, ERROR)
- if 网段末尾保留地址数量 < 0:
- ERROR = '函数参数 网段末尾保留地址数量 小于0'
- return(1, ERROR)
- ## 计算可用IP地址范围及判断是否还有可自动分配IP地址
- NET_IP_MIN, NET_IP_MAX = 根据地址和掩码计算起始和结束地址(NET_IP_STR, NET_MASK_INT) ## 网段首末地址:网段号和广播号
- 可自动分配地址最小值 = NET_IP_MIN + 1 + 网段开头保留地址数量 ## 起始IP +1 排除网络号,再加上网段开头保留地址数量
- 可自动分配地址最大值 = NET_IP_MAX - 1 - 网段末尾保留地址数量 ## 结尾IP -1 避开广播号,再减去网段末尾保留地址数量
- #print("可自动分配地址最小值", 可自动分配地址最小值, IP_INT_2_IP_STR(可自动分配地址最小值))
- #print("可自动分配地址最大值", 可自动分配地址最大值, IP_INT_2_IP_STR(可自动分配地址最大值))
- Log.debug('可自动分配地址最小值:' + str(可自动分配地址最小值) + ' ' + IP_INT_2_IP_STR(可自动分配地址最小值))
- Log.debug('可自动分配地址最大值:' + str(可自动分配地址最大值) + ' ' + IP_INT_2_IP_STR(可自动分配地址最大值))
- if 可自动分配地址最小值 > 可自动分配地址最大值:
- ERROR = '没有可自动分配地址:保留地址过多 或 网段太小'
- return(1, ERROR)
-
- ## 开始查找可用IP地址
- 可用_IP_STR = ''
-
- ## 从 S_DATA 绑定信息中提取全部的IP地址
- D_IP_MAC = IP_MAC_绑定信息转成IP为KEY字典(S_DATA)
- L_BIND_IP_STR = [i for i in D_IP_MAC] ## 列表_已绑定的全部IP地址(字符形式)
- #print("L_BIND_IP_STR 已绑定使用地址", L_BIND_IP_STR)
- Log.debug('已绑定使用地址(IP_STR):' + str(L_BIND_IP_STR))
- L_BIND_IP_INT = [IP_STR_2_IP_INT(i) for i in L_BIND_IP_STR] ## 列表_已绑定的全部IP地址(数字形式)
- #print("L_BIND_IP_INT 已绑定使用地址", L_BIND_IP_INT)
- Log.debug('已绑定使用地址(IP_INT):' + str(L_BIND_IP_INT))
-
- ## 从小到大遍历可以自动分配的地址范围,判断是否被使用,尝试找出没有被使用的最小IP,作为自动绑定的可用IP
- 列表_网段已被占用地址 = [i for i in L_BIND_IP_INT if 可自动分配地址最小值<=i<=可自动分配地址最大值]
- #print("列表_网段已被占用地址", 列表_网段已被占用地址)
- 集合_网段已被占用地址 = set(列表_网段已被占用地址) ## 转成集合判断元素是否存在会快一点
- #print("集合_网段已被占用地址", 集合_网段已被占用地址)
- for i in range(可自动分配地址最小值, 可自动分配地址最大值+1): ## 从小到大遍历自动分配可用地址
- #print("遍历可网段中可自动分配IP地址", i, IP_INT_2_IP_STR(i))
- if i not in 集合_网段已被占用地址: ## 如果遍历到的这个IP地址不在已经绑定地址集合中
- 可用_IP_STR = IP_INT_2_IP_STR(i) ## 遍历到的这个IP地址就是可用地址,转成字符形式IP地址
- #print("找到空缺可用IP地址", 可用_IP_STR)
- Log.debug('找到空缺可用IP地址:' + 可用_IP_STR)
- break ## 找到了就终止循环
- ## 判断捡漏查找结果
- if 可用_IP_STR == '':
- ERROR = '无可用地址:网段可自动分配区间地址已经全部被使用,可以尝试扩大网段或减少保留地址数量'
- return(1, ERROR)
- else:
- return(0, 可用_IP_STR)
- ###############################################################################################################################
-
-
-
-
-
- '''
- ########################
- ## 任务函数,完成任务 ##
- ########################
- '''
-
- ####################################################################################################
- ## 任务函数:保存配置()
- ####################################################################################################
-
- ## 任务控制部分(负责检查参数,控制SSH连接的打开和关闭,调用相应交互操作命令执行任务)
- def 保存配置(D_LOGIN_INFO):
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO) # 尝试登录SSH设备
- if R[0] == 0: # 登录成功
- SSH = R[1] # 获取SSH连接对象
- SHELL = R[2] # 获取可交互操作的子SHELL对象
- RR = SHELL_SAVE(SSH, SHELL) # 调用相应交互操作命令执行任务
- SSH.close() # 任务完成后断开SSH连接
- return(RR) # 把任务执行结果返回给上级函数
- else:
- return(1, R[1]) # 返回失败代码和错误信息
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果,DEBUG日志)
- def SHELL_SAVE(SSH, SHELL):
- ## 判断当前是否是<>模式
- CMD = 'display clock' # 任意命令,拿到回显最后一行,判断是 <> 还是 []
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- Log.debug('函数 SHELL_SAVE() 的 display clock 命令回显:' + S_DATA)
- code,data = 返回查看模式(S_DATA, SHELL, 最大深度=5) # 调用 返回查看模式() 这个交换机需要在查看模式才能保存配置
- if code == 0:
- Log.debug(data + '开始执行 save 命令')
- CMD = 'save'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA)
- 正则表达式 = '(.*)Are you sure to continue(.*)'
- RE_S = re.search(正则表达式, S_DATA)
- if RE_S:
- CMD = 'y'
- S_DATA = SHELL_CMD(SHELL, CMD, 2)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA)
- ## 如果遇到保存名字,直接回车
- 正则表达式 = '(.*)Please input the file name(.*)'
- RE_S = re.search(正则表达式, S_DATA)
- if RE_S:
- CMD = '' ## 无命令,会直接输入回车
- S_DATA = SHELL_CMD(SHELL, CMD, 2)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA)
- ## 查看是否出现保存成功的字样,部分机器慢,需要设置一个长一点的等待时间
- 保存成功字符串 = re.search('(.*)successfully(.*)', S_DATA) # 检查回显中是否有成功字样出现
- if 保存成功字符串:
- INFO = '保存成功'
- return(0, INFO)
- else:
- ERROR = '未发现successfully字符,可能保存时间过久,命令暂停时间不够,请开启日志DEBUG模式查看操作过程'
- return(1, ERROR)
- else:
- ERROR = '回显内容中没有匹配到' + 正则表达式
- return(1, ERROR)
- else:
- ERROR = '返回<查看模式失败>失败信息:' + data
- return(1, ERROR)
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:获取IP和MAC绑定信息()
- ####################################################################################################
-
- ## 任务控制部分(负责检查参数,控制SSH连接的打开和关闭,调用相应交互操作命令执行任务)
- def 获取IP和MAC绑定信息(D_LOGIN_INFO):
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- RR = SHELL_D_IP_MAC(SSH, SHELL)
- SSH.close()
- return(RR)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果,DEBUG日志)
- def SHELL_D_IP_MAC(SSH, SHELL):
- CMD = 'screen-length 0 temporary'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.3)
- CMD = 'dis cu | include user-bind static' # 准备设备执行交互命令:查看IP和MAC绑定信息
- S_DATA = SHELL_CMD(SHELL, CMD, 2)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_IP_MAC = IP_MAC_绑定信息转成IP为KEY字典(S_DATA) # 调用功能函数处理回显内容
- return(0,D_IP_MAC) # 返回成功代码和处理结果
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:绑定指定IP和指定MAC()
- ####################################################################################################
-
- ## 任务控制部分(负责检查参数,控制SSH连接的打开和关闭,调用相应交互操作命令执行任务)
- def 绑定指定IP和指定MAC(D_LOGIN_INFO, Bind_IP, Bind_MAC):
- ## 检查参数IP地址是否合法
- IP_TEST = TEST_IP_STR(Bind_IP)
- if IP_TEST[0] != 0:
- return(IP_TEST) # 不合法立即终止
- ## 检查参数MAC地址是否合法
- R_TRY = TRY_MAC_2_MAC(Bind_MAC, 转换格式='xxxx-xxxx-xxxx')
- if R_TRY[0] != 0:
- return(R_TRY) # 不合法立即终止
- else:
- Bind_MAC = R_TRY[1] # 不管用户输入什么格式,都转成'xxxx-xxxx-xxxx'格式
-
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- RR = SHELL_Bind_IP_MAC(SSH, SHELL, Bind_IP, Bind_MAC)
- SSH.close()
- return(RR)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果,DEBUG日志)
- def SHELL_Bind_IP_MAC(SSH, SHELL, Bind_IP, Bind_MAC):
- ## 开始按顺序执行交互命令
- CMD = 'screen-length 0 temporary'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'sys' # 准备设备执行交互命令:进入配置模式
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'user-bind static ip-address ' + Bind_IP + ' mac-address ' + Bind_MAC # 准备设备执行交互命令:绑定IP和MAC
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- ## 判断此次绑定任务是否成功
- CMD = 'dis cu | include user-bind static ip-address ' + Bind_IP # 准备设备执行交互命令:检查是否成功绑定
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_IP_MAC = IP_MAC_绑定信息转成IP为KEY字典(S_DATA)
- RR = 判断是否绑定成功(D_IP_MAC, Bind_IP, Bind_MAC)
- return(RR)
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:在指定网段自动选择可用IP和指定MAC绑定()
- ####################################################################################################
-
- ## 任务控制部分(负责检查参数,控制SSH连接的打开和关闭,调用相应交互操作命令执行任务)
- def 在指定网段自动选择可用IP和指定MAC绑定(D_LOGIN_INFO, Bind_NET, Bind_MAC):
- ## 检查参数,尝试把字符串表示的网段转成网段IP_STR和掩码MASK_INT
- R_TRY_NET = TRY_NET_STR_2_IP_MASK(Bind_NET)
- if R_TRY_NET[0] != 0:
- return(1, R_TRY_NET[1])
- else:
- Bind_NET_IP_STR = R_TRY_NET[1][0]
- Bind_NET_MASK_INT = R_TRY_NET[1][1]
- ## 检查参数,尝试把用户输入的MAC地址转成交换机常用MAC格式
- R_TRY_MAC = TRY_MAC_2_MAC(Bind_MAC, 转换格式='xxxx-xxxx-xxxx')
- if R_TRY_MAC[0] != 0:
- return(1, R_TRY_MAC[1])
- else:
- Bind_MAC = R_TRY_MAC[1]
-
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- RR = SHELL_Bind_NET_MAC(SSH, SHELL, Bind_NET_IP_STR, Bind_NET_MASK_INT, Bind_MAC)
- SSH.close()
- return(RR)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果,DEBUG日志)
- def SHELL_Bind_NET_MAC(SSH, SHELL, Bind_NET_IP_STR, Bind_NET_MASK_INT, Bind_MAC):
- ## 开始按顺序执行交互命令
- CMD = 'screen-length 0 temporary'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'sys'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- CMD = 'dis cu | include user-bind static'
- S_DATA = SHELL_CMD(SHELL, CMD, 2)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- 状态_搜索可用IP, 结果_搜索可用IP = 自动查找网段中可用IP_优先自增(Bind_NET_IP_STR, Bind_NET_MASK_INT, S_DATA)
- if 状态_搜索可用IP == 0:
- Bind_IP = 结果_搜索可用IP
- RR = SHELL_Bind_IP_MAC(SSH, SHELL, Bind_IP, Bind_MAC)
- return(RR)
- else:
- return(1, 结果_搜索可用IP)
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:查找MAC地址所在网段()
- ####################################################################################################
-
- ## 任务控制部分(负责检查参数,控制SSH连接的打开和关闭,调用相应交互操作命令执行任务)
- def 查找MAC地址所在网段(D_LOGIN_INFO, MAC):
- R_TRY_MAC = TRY_MAC_2_MAC(MAC, 转换格式='xxxx-xxxx-xxxx')
- if R_TRY_MAC[0] != 0:
- return(1, R_TRY_MAC[1])
- else:
- MAC = R_TRY_MAC[1]
-
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- RR = SHELL_MAC_IN_VLAN(SSH, SHELL, MAC)
- SSH.close()
- return(RR)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果,DEBUG日志)
- def SHELL_MAC_IN_VLAN(SSH, SHELL, MAC):
- CMD = 'display mac-address ' + MAC
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_MAC_VLAN = MAC缓存信息转成MAC对应VLAN字典(S_DATA)
- if MAC in D_MAC_VLAN:
- 列表_VLAN = D_MAC_VLAN[MAC]
- if len(列表_VLAN) == 1:
- VLAN_ID_STR = 列表_VLAN[0]
- return(0, VLAN_ID_STR)
- else:
- E = 'MAC地址 ' + str(MAC) + ' 在VLAN ' + str(列表_VLAN) + ' 间漂移,当前无法确定VLAN号'
- return(1, E)
- else:
- E = '在MAC地址缓存中没有找到 ' + str(MAC)
- return(1, E)
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:在MAC所在网段自动选择可用IP和进行绑定()
- ####################################################################################################
-
- ## 任务控制部分(负责检查参数,控制SSH连接的打开和关闭,调用相应交互操作命令执行任务)
- def 在MAC所在网段自动选择可用IP和进行绑定(D_LOGIN_INFO, Bind_MAC):
- R_TRY_MAC = TRY_MAC_2_MAC(Bind_MAC, 转换格式='xxxx-xxxx-xxxx')
- if R_TRY_MAC[0] != 0:
- return(1, R_TRY_MAC[1])
- else:
- Bind_MAC = R_TRY_MAC[1]
-
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- RR = SHELL_AUTO_Bind_MAC(SSH, SHELL, Bind_MAC)
- SSH.close()
- return(RR)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果,DEBUG日志)
- def SHELL_AUTO_Bind_MAC(SSH, SHELL, Bind_MAC):
- CMD = 'screen-length 0 temporary'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.3)
-
- CMD = 'display ip interface brief' # 交换机命令:查看VLAN的接口IP
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_VLAN_IP_MASK = VLAN和NET字典(S_DATA)
- if D_VLAN_IP_MASK == {}:
- E = '查不到任何VLAN的接口IP'
- return(1,E)
-
- CMD = 'display mac-address ' + Bind_MAC # 交换机命令:查找MAC的网络信息
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_MAC_VLAN = MAC缓存信息转成MAC对应VLAN字典(S_DATA)
- if Bind_MAC in D_MAC_VLAN:
- #print(Bind_MAC, "VLAN缓存记录", D_MAC_VLAN[Bind_MAC])
- 列表_VLAN = D_MAC_VLAN[Bind_MAC]
- if len(列表_VLAN) == 1:
- #print("查VLAN网段")
- S_VLAN_ID = 'Vlanif' + 列表_VLAN[0]
- if S_VLAN_ID in D_VLAN_IP_MASK:
- Bind_NET = D_VLAN_IP_MASK[S_VLAN_ID][0] # 例 ['192.168.0.1/24'] 取出 '192.168.0.1/24'
- Bind_NET_SP = Bind_NET.split('/')
- Bind_NET_IP_STR = Bind_NET_SP[0]
- Bind_NET_MASK_INT = int(Bind_NET_SP[1])
- RR = SHELL_Bind_NET_MAC(SSH, SHELL, Bind_NET_IP_STR, Bind_NET_MASK_INT, Bind_MAC)
- return(RR)
- else:
- E = 'VLAN ' + S_VLAN_ID + ' 没有接口IP地址'
- return(1,E)
- else:
- E = 'MAC地址 ' + str(Bind_MAC) + ' 在VLAN' + str(列表_VLAN) + '间漂移,当前无法确定VLAN号'
- return(1,E)
- else:
- E = '在MAC地址缓存中没有找到 ' + str(MAC)
- return(1,E)
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:解除指定IP和指定MAC的绑定()
- ####################################################################################################
-
- ## 任务控制部分(负责检查参数,控制SSH连接的打开和关闭,调用相应交互操作命令执行任务)
- def 解除指定IP和指定MAC的绑定(D_LOGIN_INFO, Undo_Bind_IP, Undo_Bind_MAC):
- ## 检查参数IP地址是否合法
- IP_TEST = TEST_IP_STR(Undo_Bind_IP)
- if IP_TEST[0] != 0:
- return(IP_TEST) # 不合法立即终止
- ## 检查参数MAC地址是否合法
- R_TRY = TRY_MAC_2_MAC(Undo_Bind_MAC, 转换格式='xxxx-xxxx-xxxx')
- if R_TRY[0] != 0:
- return(R_TRY) # 不合法立即终止
- else:
- Undo_Bind_MAC = R_TRY[1] # 不管用户输入什么格式,都转成'xxxx-xxxx-xxxx'格式
-
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- RR = SHELL_UNDO_Bind_IP_MAC(SSH, SHELL, Undo_Bind_IP, Undo_Bind_MAC)
- SSH.close()
- return(RR)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果,DEBUG日志)
- def SHELL_UNDO_Bind_IP_MAC(SSH, SHELL, Undo_Bind_IP, Undo_Bind_MAC):
- CMD = 'sys'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'dis cu | include user-bind static ip-address ' + Undo_Bind_IP
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_IP_MAC = IP_MAC_绑定信息转成IP为KEY字典(S_DATA)
- if Undo_Bind_IP in D_IP_MAC:
- if Undo_Bind_MAC in D_IP_MAC[Undo_Bind_IP]:
- CMD = 'undo user-bind static ip-address ' + Undo_Bind_IP + ' mac-address ' + Undo_Bind_MAC
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- CMD = 'dis cu | include user-bind static ip-address ' + Undo_Bind_IP
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_IP_MAC_复查 = IP_MAC_绑定信息转成IP为KEY字典(S_DATA)
- if Undo_Bind_IP in D_IP_MAC_复查:
- if Undo_Bind_MAC in D_IP_MAC_复查[Undo_Bind_IP]:
- ERROR = 'ERROR IP ' + Undo_Bind_IP + ' 和 ' + Undo_Bind_MAC + ' 解绑失败'
- return(1,ERROR)
- else:
- INFO = 'INFO IP ' + Undo_Bind_IP + ' 和 ' + Undo_Bind_MAC + ' 解绑成功,剩余MAC绑定 ' + str(D_IP_MAC_复查[Undo_Bind_IP])
- return(0,INFO)
- else:
- INFO = 'INFO IP ' + Undo_Bind_IP + ' 解绑成功,且已无MAC绑定记录'
- return(0,INFO)
- else:
- WARNING = 'WARNING IP ' + Undo_Bind_IP + ' 原先就没有和 ' + Undo_Bind_MAC + '的绑定记录'
- return(0,WARNING)
- else:
- WARNING = 'WARNING IP ' + Undo_Bind_IP + ' 原先就无MAC绑定记录'
- return(0,WARNING)
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:解除指定IP的全部MAC绑定()
- ####################################################################################################
-
- ## 任务控制部分(负责打开和关闭SSH连接,调用相应交互操作命令)
- def 解除指定IP的全部MAC绑定(D_LOGIN_INFO, Undo_Bind_IP):
- ## 检查参数IP地址是否合法
- IP_TEST = TEST_IP_STR(Undo_Bind_IP)
- if IP_TEST[0] != 0:
- return(IP_TEST) # 不合法立即终止
-
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- RR = SHELL_UNDO_Bind_IP(SSH, SHELL, Undo_Bind_IP)
- SSH.close()
- return(RR)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果)
- def SHELL_UNDO_Bind_IP(SSH, SHELL, Undo_Bind_IP):
- CMD = 'screen-length 0 temporary'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'sys'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'dis cu | include user-bind static ip-address ' + Undo_Bind_IP
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_IP_MAC = IP_MAC_绑定信息转成IP为KEY字典(S_DATA)
- if Undo_Bind_IP in D_IP_MAC:
- for MAC in D_IP_MAC[Undo_Bind_IP]:
- CMD = 'undo user-bind static ip-address ' + Undo_Bind_IP + ' mac-address ' + MAC
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- ## 复查删除结果
- CMD = 'dis cu | include user-bind static ip-address ' + Undo_Bind_IP
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_IP_MAC_复查 = IP_MAC_绑定信息转成IP为KEY字典(S_DATA)
- if Undo_Bind_IP in D_IP_MAC_复查:
- ERROR = 'ERROR IP ' + Undo_Bind_IP + ' 解绑全部MAC失败,剩余MAC绑定 ' + str(D_IP_MAC_复查[Undo_Bind_IP])
- return(1,ERROR)
- else:
- INFO = 'INFO IP ' + Undo_Bind_IP + ' 解绑全部MAC成功,已解除绑定的MAC列表 ' + str(D_IP_MAC[Undo_Bind_IP])
- return(0,INFO)
- else:
- WARNING = 'WARNING IP ' + Undo_Bind_IP + ' 原先就无MAC绑定记录'
- return(0,WARNING)
- ####################################################################################################
-
-
-
- ####################################################################################################
- ## 任务函数:解除指定MAC的全部IP绑定()
- ####################################################################################################
-
- ## 任务控制部分(负责打开和关闭SSH连接,调用相应交互操作命令)
- def 解除指定MAC的全部IP绑定(D_LOGIN_INFO, Undo_Bind_MAC):
- ## 检查参数MAC地址是否合法
- R_TRY = TRY_MAC_2_MAC(Undo_Bind_MAC, 转换格式='xxxx-xxxx-xxxx')
- if R_TRY[0] != 0:
- return(R_TRY) # 不合法立即终止
- else:
- Undo_Bind_MAC = R_TRY[1] # 不管用户输入什么格式,都转成'xxxx-xxxx-xxxx'格式
-
- 测试结果,结果说明 = TEST_D_LOGIN_INFO(D_LOGIN_INFO)
- if 测试结果 == 0:
- R = SSH_LOGIN(D_LOGIN_INFO)
- if R[0] == 0:
- SSH = R[1]
- SHELL = R[2]
- code,data = SHELL_UNDO_Bind_MAC(SSH, SHELL, Undo_Bind_MAC)
- SSH.close()
- return(code,data)
- else:
- return(1, R[1])
- else:
- return(1, 结果说明)
-
- ## 任务实现部分(执行交互操作命令,根据回显结果进行处理,返回处理状态和处理结果)
- def SHELL_UNDO_Bind_MAC(SSH, SHELL, Undo_Bind_MAC):
- CMD = 'screen-length 0 temporary'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'sys'
- S_DATA = SHELL_CMD(SHELL, CMD, 0.5)
- CMD = 'dis cu | include user-bind static'
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_MAC_IP = IP_MAC_绑定信息转成MAC为KEY字典(S_DATA)
- if Undo_Bind_MAC in D_MAC_IP:
- for IP in D_MAC_IP[Undo_Bind_MAC]:
- CMD = 'undo user-bind static ip-address ' + IP + ' mac-address ' + Undo_Bind_MAC
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- ## 复查删除结果
- CMD = 'dis cu | include user-bind static'
- S_DATA = SHELL_CMD(SHELL, CMD, 1)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 ' + CMD + ' 命令回显:' + S_DATA) ## DEBUG
- D_MAC_IP_复查 = IP_MAC_绑定信息转成MAC为KEY字典(S_DATA)
- if Undo_Bind_MAC in D_MAC_IP_复查:
- ERROR = 'ERROR MAC ' + Undo_Bind_MAC + ' 解绑全部IP失败,剩余IP绑定 ' + str(D_MAC_IP_复查[Undo_Bind_MAC])
- return(1,ERROR)
- else:
- INFO = 'INFO MAC ' + Undo_Bind_MAC + ' 解绑全部IP成功,已解除绑定的IP列表 ' + str(D_MAC_IP[Undo_Bind_MAC])
- return(0,INFO)
- else:
- WARNING = 'WARNING MAC ' + Undo_Bind_MAC + ' 原先就无IP绑定记录'
- return(0,WARNING)
- ####################################################################################################
文件三代码:
- #_*_ coding:utf8 _*_
- from DEV_SNMP import * # 导入编写在另一个文件里的SNMP操作函数
- import sys # 本例中用于获取函数名写入日志文件
- import logging # 日志模块
- Log = logging.getLogger("__name__")
- ## 如果日志实例不存在,就使用本程序日志模块
- if not logging.FileHandler:
- formatter = logging.Formatter('%(asctime)s %(levelname)-8s : %(message)s') # 指定logger输出格式
- file_handler = logging.FileHandler("DEF_SNMP_eNSP_Switch_S5700.log") # 日志文件路径
- file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
- Log.addHandler(file_handler)
- Log.setLevel(logging.DEBUG)
-
-
- ## 函数调用关系
- '''
- DEF_SNMP_eNSP_Switch_S5700.py
- from DEV_SNMP import * # 需要导入编写DEV_SNMP.py文件里的SNMP操作函数
- import sys # 本文件中只用于获取函数名方便生成日志内容
- import logging # 日志模块
- SNMP_MAC_2_mac(SNMP_MAC) ## SNMP获取的MAC地址是 '00 00 00 00 00 00' 形式,需要转成 '0000-0000-0000' 形式
- SNMP_L_Bind_IP_STR(SNMP_PWD, SNMP_HOST) ## 获取静态绑定IP列表
- SNMP_L_Bind_MAC_STR(SNMP_PWD, SNMP_HOST) ## 获取静态绑定MAC列表
- SNMP_L_IF_IP_STR(SNMP_PWD, SNMP_HOST) ## 获取设备接口IP列表(VLAN接口、物理端口、特殊接口)
- SNMP_L_IF_MASK_STR(SNMP_PWD, SNMP_HOST) ## 获取设备接口IP的掩码列表(VLAN接口、物理端口、特殊接口)
- SNMP_L_IF_INDEX_STR(SNMP_PWD, SNMP_HOST) ## 获取设备接口编号(VLAN接口、物理端口、特殊接口)
- SNMP_D_IF_DESCR(SNMP_PWD, SNMP_HOST) ## 获取设备接口名称字典(VLAN接口、物理端口、特殊接口)## {'1':InLoopBack0, '2':'NULL0', '3':'Console9/0/0', '4':'MEth0/0/1', '5':'Vlanif1', '6':'GigabitEthernet0/0/1'}
- SNMP_生成以IP为键以MAC地址列表为值的字典(SNMP_PWD, SNMP_HOST) ## 以IP为键以MAC地址列表为值的字典 {IP_STR:[MAC_STR, MAC_STR]}
- SNMP_L_Bind_IP_STR(SNMP_PWD, SNMP_HOST)
- SNMP_L_Bind_MAC_STR(SNMP_PWD, SNMP_HOST)
- SNMP_MAC_2_mac(SNMP_MAC)
- SNMP_生成以VLAN为键以VLAN_IP_MASK列表为值的字典(SNMP_PWD, SNMP_HOST) ## D_VLAN_NET {VLAN_STR:[IP_STR/MASK_STR]}
- SNMP_L_IF_IP_STR(SNMP_PWD, SNMP_HOST)
- SNMP_L_IF_MASK_STR(SNMP_PWD, SNMP_HOST)
- SNMP_L_IF_INDEX_STR(SNMP_PWD, SNMP_HOST)
- SNMP_D_IF_DESCR(SNMP_PWD, SNMP_HOST)
- '''
-
-
- ## 全局变量 交换机 S5700 要用到的 OID
- OID_BIND_IP = '1.3.6.1.4.1.2011.5.25.112.1.14.1.12' # 静态绑定IP列表
- OID_BIND_MAC = '1.3.6.1.4.1.2011.5.25.112.1.14.1.11' # 静态绑定MAC列表
- OID_IF_IP_STR = '1.3.6.1.4.1.2011.5.25.41.1.2.1.1.1' # 设备接口IP列表(VLAN接口、物理端口、特殊接口)
- OID_IF_MASK_STR = '1.3.6.1.4.1.2011.5.25.41.1.2.1.1.3' # 设备接口IP的掩码列表(VLAN接口、物理端口、特殊接口)
- OID_IF_INDEX = '1.3.6.1.4.1.2011.5.25.41.1.2.1.1.2' # 设备接口编号(VLAN接口、物理端口、特殊接口)
- OID_IF_DESCR = 'IF-MIB::ifDescr' # 设备接口名称字典(VLAN接口、物理端口、特殊接口)
-
-
- ## SNMP获取的MAC地址是 '00 00 00 00 00 00' 形式,需要转成 '0000-0000-0000' 形式
- def SNMP_MAC_2_mac(SNMP_MAC):
- MAC = SNMP_MAC.replace(' ', '')
- mac = MAC.lower()
- mac_new = mac[0:4] +'-'+ mac[4:8] +'-'+ mac[8:12]
- return(mac_new)
-
-
- ## 获取静态绑定IP列表
- def SNMP_L_Bind_IP_STR(SNMP_PWD, SNMP_HOST):
- SNMP_OID = OID_BIND_IP
- (N, R) = SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID)
- if N == 0:
- if R.split(' = ')[-1] == 'No Such Object available on this agent at this OID':
- ERROR = '错误 OID ' + SNMP_OID
- return(1, ERROR)
- elif R.split(' = ')[-1] == 'No Such Instance currently exists at this OID':
- ## SNMP查询结果为空
- return(0, [])
- else:
- R_SP = R.split('\n')
- L_IP_STR = [i.split(': ')[-1] for i in R_SP]
- return(0, L_IP_STR)
- else:
- ERROR = 'SNMP命令执行失败:' + str(R)
- return(1, ERROR)
-
-
- ## 获取静态绑定MAC列表
- def SNMP_L_Bind_MAC_STR(SNMP_PWD, SNMP_HOST):
- SNMP_OID = OID_BIND_MAC
- (N, R) = SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID)
- if N == 0:
- if R.split(' = ')[-1] == 'No Such Object available on this agent at this OID':
- ERROR = '错误 OID ' + SNMP_OID
- return(1, ERROR)
- elif R.split(' = ')[-1] == 'No Such Instance currently exists at this OID':
- ## SNMP查询结果为空
- return(0, [])
- else:
- R_SP = R.split('\n')
- L_MAC_STR = [SNMP_MAC_2_mac(i.split(': ')[-1]) for i in R_SP]
- return(0, L_MAC_STR)
- else:
- ERROR = 'SNMP命令执行失败:' + str(R)
- return(1, ERROR)
-
-
- ## 获取设备接口IP列表(VLAN接口、物理端口、特殊接口)
- def SNMP_L_IF_IP_STR(SNMP_PWD, SNMP_HOST):
- SNMP_OID = OID_IF_IP_STR
- (N, R) = SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID)
- if N == 0:
- if R.split(' = ')[-1] == 'No Such Object available on this agent at this OID':
- ERROR = '错误 OID ' + SNMP_OID
- return(1, ERROR)
- elif R.split(' = ')[-1] == 'No Such Instance currently exists at this OID':
- ## SNMP查询结果为空
- return(0, [])
- else:
- R_SP = R.split('\n') ## 先用'\n'(换行符)分出每一条记录
- L_IP_STR = [i.split(': ')[-1] for i in R_SP] ## 再用': '(冒号加一空)分段并提取需要的值(最后一段)
- return(0, L_IP_STR)
- else:
- ERROR = 'SNMP命令执行失败:' + str(R)
- return(1, ERROR)
-
-
- ## 获取设备接口IP的掩码列表(VLAN接口、物理端口、特殊接口)
- def SNMP_L_IF_MASK_STR(SNMP_PWD, SNMP_HOST):
- SNMP_OID = OID_IF_MASK_STR
- (N, R) = SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID)
- if N == 0:
- if R.split(' = ')[-1] == 'No Such Object available on this agent at this OID':
- ERROR = '错误 OID ' + SNMP_OID
- return(1, ERROR)
- elif R.split(' = ')[-1] == 'No Such Instance currently exists at this OID':
- ## SNMP查询结果为空
- return(0, [])
- else:
- R_SP = R.split('\n')
- L_IF_MASK_STR = [i.split(': ')[-1] for i in R_SP]
- return(0, L_IF_MASK_STR)
- else:
- ERROR = 'SNMP命令执行失败:' + str(R)
- return(1, ERROR)
-
-
- ## 获取设备接口编号(VLAN接口、物理端口、特殊接口)
- def SNMP_L_IF_INDEX_STR(SNMP_PWD, SNMP_HOST):
- SNMP_OID = OID_IF_INDEX
- (N, R) = SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID)
- if N == 0:
- if R.split(' = ')[-1] == 'No Such Object available on this agent at this OID':
- ERROR = '错误 OID ' + SNMP_OID
- return(1, ERROR)
- elif R.split(' = ')[-1] == 'No Such Instance currently exists at this OID':
- ## SNMP查询结果为空
- return(0, [])
- else:
- R_SP = R.split('\n')
- L_IF_VLAN_STR = [i.split(': ')[-1] for i in R_SP]
- return(0, L_IF_VLAN_STR)
- else:
- ERROR = 'SNMP命令执行失败:' + str(R)
- return(1, ERROR)
-
-
- ## 获取设备接口名称字典(VLAN接口、物理端口、特殊接口)
- ## {'1':InLoopBack0, '2':'NULL0', '3':'Console9/0/0', '4':'MEth0/0/1', '5':'Vlanif1', '6':'GigabitEthernet0/0/1'}
- def SNMP_D_IF_DESCR(SNMP_PWD, SNMP_HOST):
- SNMP_OID = OID_IF_DESCR
- (N, R) = SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID)
- if N == 0:
- if R.split(' = ')[-1] == 'No Such Object available on this agent at this OID':
- ERROR = '错误 OID ' + SNMP_OID
- return(1, ERROR)
- elif R.split(' = ')[-1] == 'No Such Instance currently exists at this OID':
- ## SNMP查询结果为空
- return(0, {})
- else:
- D_IF_DESCR = {}
- R_SP = R.split('\n') ## 先用'\n'(换行符)分出每一条记录
- L_IF_DESCR = [i.split(': ')[-1] for i in R_SP] ## 再用': '(冒号加一空)分段并提取需要的值(最后一段)
- for i in range(0, len(L_IF_DESCR)):
- D_IF_DESCR[str(i+1)] = L_IF_DESCR[i]
- return(0, D_IF_DESCR)
- else:
- ERROR = 'SNMP命令执行失败:' + str(R)
- return(1, ERROR)
-
-
-
- ## D_IP_MAC
- ## 以IP为键以MAC地址列表为值的字典
- ## {IP_STR:[MAC_STR, MAC_STR]}
- ## {'x.x.x.x':['xxxx-xxxx-xxxx', 'xxxx-xxxx-xxxx']}
- ## {'10.0.0.1':['0000-0000-000a', '0000-0000-000b']}
- def SNMP_生成以IP为键以MAC地址列表为值的字典(SNMP_PWD, SNMP_HOST):
- (N_IP, R_IP) = SNMP_L_Bind_IP_STR(SNMP_PWD, SNMP_HOST)
- (N_MAC, R_MAC) = SNMP_L_Bind_MAC_STR(SNMP_PWD, SNMP_HOST)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 SNMP_L_Bind_IP_STR 调用结果:' + str(N_IP) + str(R_IP))
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 SNMP_L_Bind_MAC_STR 调用结果:' + str(N_MAC) + str(R_MAC))
- if N_IP == 0:
- if N_MAC == 0:
- IP_List = R_IP
- MAC_List = R_MAC
- IP_List_Len = len(IP_List)
- MAC_List_Len = len(MAC_List)
- if IP_List_Len == MAC_List_Len:
- D_IP_MAC = {}
- for i in range(0, IP_List_Len):
- IP = IP_List[i]
- MAC = MAC_List[i]
- if IP not in D_IP_MAC:
- D_IP_MAC[IP] = [MAC]
- else:
- D_IP_MAC[IP].append(MAC)
- return(0, D_IP_MAC)
- else:
- E = '绑定记录IP和MAC不能一一对应,len(IP_List) != len(MAC_List)'
- return(1, E)
- else:
- E = '获取IP成功,获取MAC失败:' + R_MAC
- return(1, E)
- else:
- if N_MAC == 0:
- E = '获取MAC成功,获取IP失败:' + R_IP
- return(1, E)
- else:
- E = '获取IP失败:' + R_IP + ' 获取MAC失败:' + R_MAC
- return(1, E)
-
-
-
- ## D_VLAN_NET
- ## {VLAN_STR:[IP_STR/MASK_STR]}
- ## {'VlanifN':['x.x.x.x/x.x.x.x', 'x.x.x.x/x.x.x.x']}
- def SNMP_生成以VLAN为键以VLAN_IP_MASK列表为值的字典(SNMP_PWD, SNMP_HOST):
- D_VLAN_NET = {}
- (N_IP, R_IP) = SNMP_L_IF_IP_STR(SNMP_PWD, SNMP_HOST)
- (N_MASK, R_MASK) = SNMP_L_IF_MASK_STR(SNMP_PWD, SNMP_HOST)
- (N_INDEX, R_INDEX) = SNMP_L_IF_INDEX_STR(SNMP_PWD, SNMP_HOST)
- (N_DESCR, R_DESCR) = SNMP_D_IF_DESCR(SNMP_PWD, SNMP_HOST)
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 SNMP_L_IF_IP_STR 调用结果:' + str(N_IP) + str(R_IP))
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 SNMP_L_IF_MASK_STR 调用结果:' + str(N_MASK) + str(R_MASK))
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 SNMP_L_IF_INDEX_STR 调用结果:' + str(N_INDEX) + str(R_INDEX))
- Log.debug('函数 ' + sys._getframe().f_code.co_name + ' 的 SNMP_D_IF_DESCR 调用结果:' + str(N_DESCR) + str(R_DESCR))
- if N_IP == N_MASK == N_INDEX == N_DESCR == 0:
- if len(R_IP) == len(R_MASK) == len(R_INDEX):
- for i in range(0, len(R_IP)):
- VLAN = R_INDEX[i]
- IP = R_IP[i]
- MASK = R_MASK[i]
- if VLAN in R_DESCR:
- KEY = R_DESCR[VLAN]
- VALUE = IP + '/' + MASK
- if KEY not in D_VLAN_NET:
- D_VLAN_NET[KEY] = [VALUE]
- else:
- D_VLAN_NET[KEY].append(VALUE)
- else:
- ERROR = 'VLAN ' + VLAN + ' 在设备中没有找到'
- return(1, ERROR)
- return(0, D_VLAN_NET)
- else:
- ERROR = 'IP MASK VLAN 数量不一致'
- return(1, ERROR)
- else:
- ERROR = '有错误 '
- if N_IP != 0:
- ERROR += R_IP
- if N_MASK != 0:
- ERROR += R_MASK
- if N_INDEX != 0:
- ERROR += R_INDEX
- if N_DESCR != 0:
- ERROR += R_DESCR
- return(1, ERROR)
-
-
-
-
-
文件四代码:
- #_*_ coding:utf8 _*_
- import paramiko # SSH 操作模块
- import time # 计时
-
-
- ## SSH_LOGIN
- ## 登录SSH设备成功不关闭会话,返回ssh连接对象和交互子shell对象,后续可以在登录状态继续操作
- def SSH_LOGIN(D_LOGIN_INFO):
- SSH_IP = D_LOGIN_INFO['SSH_IP']
- SSH_PORT = D_LOGIN_INFO['SSH_PORT']
- SSH_USER = D_LOGIN_INFO['SSH_USER']
- SSH_PASS = D_LOGIN_INFO['SSH_PASS']
- SSH = paramiko.SSHClient()
- SSH.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- try:
- SSH.connect(SSH_IP, SSH_PORT, SSH_USER, SSH_PASS) # 尝试登录SSH设备
- except Exception as e: # 登录失败
- E = '函数 SSH_LOGIN() 登录SSH设备失败 '+ str(e)
- return(1, E) # 返回错误代码1和失败原因
- else: # 登录成功
- SHELL = SSH.invoke_shell() # 使用伪终端(子shell)(交互操作界面需要用到)
- return(0, SSH, SHELL) # 返回成功代码0,ssh连接对象,交互子shell对象
-
-
- ## 函数 SHELL_CMD() 执行交互命令返回命令回显字符串
- def SHELL_CMD(SHELL, CMD, TIME=2, BUFF=10240): # 在交互子SHELL里执行操作命令并预设默认参数
- SHELL.sendall(str(CMD) + '\n') # 把命令转成字符串,防止类型不同不能相加
- time.sleep(TIME) # 命令执行后等待足够的时间(默认2秒)以接收回显内容
- B_DATA = SHELL.recv(BUFF) # 保存接收到的回显内容(字节码类型)(接收最大值默认为10240字节)
- S_DATA = B_DATA.decode('utf-8') # 回显内容转成字符串类型(使用UTF8编码)
- return(S_DATA)
-
-
- ## 函数 SHELL_CTRL() 执行控制命令返回命令回显字符串
- def SHELL_CTRL(SHELL, CTRL, TIME=2, BUFF=10240):
- SHELL.sendall(CTRL)
- time.sleep(TIME)
- B_DATA = SHELL.recv(BUFF)
- S_DATA = B_DATA.decode('utf-8')
- return(S_DATA)
-
文件五代码:
- #_*_ coding:utf8 _*_
- import subprocess # 调用操作系统命令
- import logging # 日志模块
- ## 如果日志实例不存在,就使用本程序日志模块
- Log = logging.getLogger("__name__")
- if not logging.FileHandler:
- formatter = logging.Formatter('%(asctime)s %(levelname)-8s : %(message)s') # 指定logger输出格式
- file_handler = logging.FileHandler("DEV_SNMP.log") # 日志文件路径
- file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
- Log.addHandler(file_handler)
- Log.setLevel(logging.DEBUG)
-
- '''
- 说明:以下函数是通过 Python3 subprocess 模块调用系统命令实现操作SNMP的功能
- 前提条件1:系统需要先安装好SNMP工具,有查询和修改SNMP的命令
- 安装SNMP命令:以 CentOS7 系统为例
- yum install net-snmp-utils # 安装SNMP工具
- 安装后就会有如下命令
- snmpbulkwalk # 用于SNMP查询值
- snmpset # 用于SNMP修改值
- 前提条件2:被管理设备上要开启SNMP功能,以华为模拟器S5700交换机为例:
- system-view # 进入配置模式
- snmp-agent # 配置SNMP
- snmp-agent sys-info version v2c # 设置SNMP版本
- snmp-agent community read pwd@123 # 设置只读SNMP密码
- snmp-agent community write pwd@456 # 设置修改SNMP密码
- snmp-agent udp-port 161 # 设置SNMP端口,默认就是 UDP 161
- '''
-
-
- ## SNMP v2c 版本查询功能
- def SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID):
- SNMP_CMD = 'snmpbulkwalk -v 2c -c ' + SNMP_PWD + ' ' + SNMP_HOST + ' ' + SNMP_OID
- RR = subprocess.getstatusoutput(SNMP_CMD)
- Log.debug('函数 SNMP_V2_R 执行了系统命令 ' + SNMP_CMD) ## DEBUG
- Log.debug('命令执行结果 ' + str(RR)) ## DEBUG
- return(RR)
-
- ## SNMP v2c 版本设置功能
- def SNMP_V2_W(SNMP_PWD, SNMP_HOST, SNMP_OID, SNMP_TYPE, SNMP_VALUE):
- SNMP_CMD = 'snmpset -v 2c -c '+ SNMP_PWD +' '+ SNMP_HOST +' '+ SNMP_OID +' '+ SNMP_TYPE +' '+ SNMP_VALUE
- RR = subprocess.getstatusoutput(SNMP_CMD)
- Log.debug('函数 SNMP_V2_W 执行了系统命令 ' + SNMP_CMD) ## DEBUG
- Log.debug('命令执行结果 ' + str(RR)) ## DEBUG
- return(RR)
结构示意图:
说明文件:
########################################################################################
## (业务层)
## 管理任务实现业务(根据任务结果处理业务逻辑)
## 如WEB端、命令行端、交互控制端等根据实际业务需要调用任务函数完成业务功能
## 调用 DEF_SSH_eNSP_Switch_S5700.py 里的任务函数执行SSH交互操作,获取任务执行状态和结果
## 调用 DEF_SNMP_eNSP_Switch_S5700.py 里的任务函数执行SNMP操作,获取任务执行状态和结果
## 调用 数据库任务,完成数据记录更新
########################################################################################
## 只操作交换机
Main_Switch
交互操作测试
#############################################################
## (任务层)
## 管理执行实现任务(根据执行结果处理任务逻辑)
## 向上级返回任务完成结果
## 示例:交换机应用功能的实现函数(华为eNSP模拟器S5700交换机)
#############################################################
## 通过SSH交互操作交换机
DEF_SSH_eNSP_Switch_S5700.py
from DEV_SSH import * # 需要导入DEV_SSH.py文件里的SNMP操作函数
import struct # 用于解析、打包字节码
import re # 用于匹配
import logging # 日志模块
import sys # 本文件中只用于获取函数名方便生成日志内容
TEST_D_LOGIN_INFO(D_LOGIN_INFO) # 检查登录信息字典是否合法
TEST_IP_STR(IP_STR) # 检查IP是否合法
TRY_MASK_2_MASK_INT(MASK) # 尝试把掩码转成掩码位数
IP_INT_2_IP_STR(IP_INT) # IP_INT 转 IP_STR
IP_STR_2_IP_INT(IP_STR) # IP_STR 转 IP_INT
根据地址和掩码计算起始和结束地址(IP_STR, MASK_INT) # 根据地址和掩码计算起始和结束地址
IP_MASK_2_IP_INT_LIST(IP_STR, MASK_INT) # 根据IP地址和掩码位数生成IP所在网段的全部IP_INT地址列表(列表从小到大顺序)
IP_MASK_2_IP_STR_LIST(IP_STR, MASK_INT) # 根据IP地址和掩码位数生成IP所在网段的全部IP_STR地址列表(列表从小到大顺序)
MAC缓存信息转成MAC对应VLAN字典(S_DATA)
VLAN和NET字典(S_DATA) # {'Vlanif10': ['192.168.10.1/24'], 'Vlanif20': ['192.168.20.1/24']}
VLAN和IP字典(S_DATA) # {'Vlanif10': ['192.168.10.1'], 'Vlanif20': ['192.168.20.1']}
TRY_NET_STR_2_IP_MASK(NET_STR) # 尝试把网段表示格式:'IP/MASK' 格式转成元组 (IP_STR, MASK_INT)
TRY_MAC_2_MAC(MAC_STR, 转换格式='xxxx-xxxx-xxxx') # 尝试把MAC地址转成指定格式
返回查看模式(S_DATA, SHELL, 最大深度=5)
IP_MAC_绑定信息转成IP为KEY字典(S_DATA) # 从查询静态绑定信息命令回显中提取以IP为KEY以MAC为值的字典
IP_MAC_绑定信息转成MAC为KEY字典(S_DATA) # 从查询静态绑定信息命令回显中提取以MAC为KEY以IP为值的字典
判断是否绑定成功(D, IP, MAC) # 在IP和MAC对应字典中是否出现IP和MAC绑定记录
自动查找网段中可用IP_优先自增(NET_IP_STR, NET_MASK_INT, S_DATA, 网段开头保留地址数量=1, 网段末尾保留地址数量=1)
自动查找网段中可用IP_自动填充(NET_IP_STR, NET_MASK_INT, S_DATA, 网段开头保留地址数量=1, 网段末尾保留地址数量=1)
## 任务函数,传入要登录的设备信息和任务必须的参数
## 任务函数分2部分
## 第1部分 任务控制函数(用于检查参数,控制SSH连接的打开和关闭,调用任务实现函数)
## 第2部分 任务实现函数(用于执行交互操作命令,根据回显结果进行处理,返回任务处理状态和处理结果,方便相同功能复用代码)
保存配置(D_LOGIN_INFO)
SHELL_SAVE(SSH, SHELL)
获取IP和MAC绑定信息(D_LOGIN_INFO)
SHELL_D_IP_MAC(SSH, SHELL)
绑定指定IP和指定MAC(D_LOGIN_INFO, Bind_IP, Bind_MAC)
SHELL_Bind_IP_MAC(SSH, SHELL, Bind_IP, Bind_MAC)
在指定网段自动选择可用IP和指定MAC绑定(D_LOGIN_INFO, Bind_NET, Bind_MAC)
SHELL_Bind_NET_MAC(SSH, SHELL, Bind_NET_IP_STR, Bind_NET_MASK_INT, Bind_MAC)
查找MAC地址所在网段(D_LOGIN_INFO, MAC)
SHELL_MAC_IN_VLAN(SSH, SHELL, MAC)
在MAC所在网段自动选择可用IP和进行绑定(D_LOGIN_INFO, Bind_MAC)
SHELL_AUTO_Bind_MAC(SSH, SHELL, Bind_MAC)
解除指定IP和指定MAC的绑定(D_LOGIN_INFO, Undo_Bind_IP, Undo_Bind_MAC)
SHELL_UNDO_Bind_IP_MAC(SSH, SHELL, Undo_Bind_IP, Undo_Bind_MAC)
解除指定IP的全部MAC绑定(D_LOGIN_INFO, Undo_Bind_IP)
SHELL_UNDO_Bind_IP(SSH, SHELL, Undo_Bind_IP)
解除指定MAC的全部IP绑定(D_LOGIN_INFO, Undo_Bind_MAC)
SHELL_UNDO_Bind_MAC(SSH, SHELL, Undo_Bind_MAC)
## 通过SNMP操作交换机
DEF_SNMP_eNSP_Switch_S5700.py
from DEV_SNMP import * # 需要导入DEV_SNMP.py文件里的SNMP操作函数
import sys # 本文件中只用于获取函数名方便生成日志内容
import logging # 日志模块
SNMP_MAC_2_mac(SNMP_MAC) ## SNMP获取的MAC地址是 '00 00 00 00 00 00' 形式,需要转成 '0000-0000-0000' 形式
SNMP_L_Bind_IP_STR(SNMP_PWD, SNMP_HOST) ## 获取静态绑定IP列表
SNMP_L_Bind_MAC_STR(SNMP_PWD, SNMP_HOST) ## 获取静态绑定MAC列表
SNMP_L_IF_IP_STR(SNMP_PWD, SNMP_HOST) ## 获取设备接口IP列表(VLAN接口、物理端口、特殊接口)
SNMP_L_IF_MASK_STR(SNMP_PWD, SNMP_HOST) ## 获取设备接口IP的掩码列表(VLAN接口、物理端口、特殊接口)
SNMP_L_IF_INDEX_STR(SNMP_PWD, SNMP_HOST) ## 获取设备接口编号(VLAN接口、物理端口、特殊接口)
SNMP_D_IF_DESCR(SNMP_PWD, SNMP_HOST) ## 获取设备接口名称字典(VLAN接口、物理端口、特殊接口)## {'1':InLoopBack0, '2':'NULL0', '3':'Console9/0/0', '4':'MEth0/0/1', '5':'Vlanif1', '6':'GigabitEthernet0/0/1'}
SNMP_生成以IP为键以MAC地址列表为值的字典(SNMP_PWD, SNMP_HOST) ## 以IP为键以MAC地址列表为值的字典 {IP_STR:[MAC_STR, MAC_STR]}
SNMP_生成以VLAN为键以VLAN_IP_MASK列表为值的字典(SNMP_PWD, SNMP_HOST) ## D_VLAN_NET {VLAN_STR:[IP_STR/MASK_STR]}
###################################################
## (执行层)
## 根据任务需要操作具体设备,并返操作状态和操作结果
###################################################
## 操作SSH交互设备
DEV_SSH.py # SSH登录设备执行命令并返回命令结果的函数文件
import paramiko # SSH 操作模块(第三方模块 pip install paramiko 安装)
import time # 时间模块
SSH_LOGIN(D_LOGIN_INFO) # 尝试登录设备,登录成功返回SSH连接对象和交互子SHELL对象
SHELL_CMD(SHELL, CMD, TIME=2, BUFF=10240) # 执行交互命令返回命令回显字符串
SHELL_CTRL(SHELL, CTRL, TIME=2, BUFF=10240) # 在交互界面发送控制符(未完成)
## 操作SNMP设备
DEV_SNMP.py(目前只完成Linux环境下应用)
import subprocess # 调用操作系统命令
import logging # 日志模块
SNMP_V2_R(SNMP_PWD, SNMP_HOST, SNMP_OID) # 读取SNMP设备信息
SNMP_V2_W(SNMP_PWD, SNMP_HOST, SNMP_OID, SNMP_TYPE, SNMP_VALUE) # 修改SNMP设备配置
##################
## 变量含义说明 ##
##################
## 变量前缀后缀
B 字节
S 字符串
L 列表
T 元组
D 字典
P 集合
INT 数字
STR 字符串
R 接收函数全部返回内容为一个变量 R = 函数()
D_LOGIN_INFO = {SSH_IP:'192.168.0.1', SSH_PORT:22, SSH_USER:'abc', SSH_PASS:'pwd@123'} # 字典类型登录信息
SSH_IP # D_LOGIN_INFO 的 KEY 其值为 登录地址(字符串)
SSH_PORT # D_LOGIN_INFO 的 KEY 其值为 设备端口(数字)
SSH_USER # D_LOGIN_INFO 的 KEY 其值为 登录用户(字符串)
SSH_PASS # D_LOGIN_INFO 的 KEY 其值为 登录密码(字符串)
B_DATA # 执行交互命令后回显的内容(字节)
S_DATA # 执行交互命令后回显的内容(字符串)
网段开头保留地址数量 # 一般第一个IP做网关,或者前面保留一些IP地址
网段末尾保留地址数量 # 一般最后一个做网关,或者后面保留一些IP地址
D # 字典,D_IP_MAC 或 D_MAC_IP 字典
IP # 临时变量名 或 任意格式IP地址(IP_STR 或 IP_INT)
IP_STR # 点分十进制字符串类型IP地址(字符串)
IP_INT # 4字节无符号整数类型IP地址(数字)
MASK # 临时变量名 或 任意格式掩码(MASK_STR 或 MASK_INT)
MASK_STR # 点分十进制字符串类型掩码(字符串)'255.255.255.0'
MASK_INT # 掩码长度(掩码位数)(数字)
MAC # 临时变量名 或 任意格式MAC地址(字符串)3段6段大写小写
mac_H_SP3 # 连字符分3段16进制小写(字符串)'xxxx-xxxx-xxxx'(hyphen)
MAC_H_SP3 # 连字符分3段16进制大写(字符串)'XXXX-XXXX-XXXX'(hyphen)
mac_H_SP6 # 连字符分6段16进制小写(字符串)'xx-xx-xx-xx-xx-xx'(hyphen)
mac_H_SP6 # 连字符分6段16进制大写(字符串)'XX-XX-XX-XX-XX-XX'(hyphen)
mac_C_SP6 # 冒号符分6段16进制小写(字符串)'xx:xx:xx:xx:xx:xx'(colon)
MAC_C_SP6 # 冒号符分6段16进制大写(字符串)'XX:XX:XX:XX:XX:XX'(colon)
D_IP_MAC # 以 IP_STR 为键以 MAC_STR_SP3 为值的字典
D_MAC_IP # 以 MAC_STR_SP3 为键以 IP_STR 为值的字典
IP_MASK # 网段表示格式(字符串)例:'192.168.0.0/255.255.255.0' 或 '192.168.0.0/24'
NET_STR # 网段表示格式(字符串)例:'192.168.0.0/255.255.255.0' 或 '192.168.0.0/24'
T_NET (IP_STR, MASK_INT)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。