赞
踩
一、背景
最近在实施变更时需要采集多台设备信息,用于在实施完毕后对设备状态进行检查。由于人工采集比较耗时,在网上找了一下没找到自动采集的脚本,就用python写了个网络设备信息自动化信息采集脚本,在实施变更前后抓取相关设备信息,实施完成后自动对比前后变化情况。
二、使用提示
1、将设备信息填写在dev_ins.csv表格(表格内容见下图),只支持ssh,思科设备需要输入enable的在enpass列输入enable密码,不需要则不写
2、需要采集信息命令在对应设备下方填写
3、第一条命令为将设备修改全部输出不限制输出行数,否则输出信息不完整
4、目前只适配了Cisco_IOS、H3C_SW、H3C_FW,如有其他设备需求可自行增加
在表格增加相关命令,在代码修改如下变量(fac\cmd),cmdread[9] 数字9与表格列数相对应(从0开始数)
在代码中修改如下变量:
if fac == "H3C_SW":
for cmdread in islice(cmd_file, 1, None):
cmd += cmdread[9] + chr(10)
5、csv表格内不要写中文
6、csv表格与脚本需要放在同一个文件夹执行
7、第一次采集与第二次采集时间如果不是同一天,在信息比对时需要修改文件名称为同一天
8、第一次采集与第二次采集信息比对时,是通过csv表格里面的IP地址来确认比对对象
9、测试环境python3.9
构造如下csv表格:
完整代码如下:
import csv import os import paramiko import time import sys import difflib import datetime from itertools import islice from datetime import datetime, date, timedelta now = str(date.today()) filename = 'dev_ins.csv' # 设备信息、执行命令表格名称 path_before = 'C:/check/collect_info/' + now + '/' + 'before' # 第一次采集信息保存路径 path_after = 'C:/check/collect_info/' + now + '/' + 'after' # 第二次采集信息保存路径 path_compare = 'C:/check/result_comp/' # 第一次第二次采集信息比对结果保存路径 def check_path(): isExists_before = os.path.exists(path_before) isExists_after = os.path.exists(path_after) isExists = os.path.exists(path_compare) if not isExists_before: os.makedirs(path_before) if not isExists_after: os.makedirs(path_after) if not isExists: os.makedirs(path_compare) def ssh_login(): cache = [] global host global fac global login global user global passw global enpass global ssh global channel set_interval_time() print('开始采集设备信息....') cmd_file = csv.reader(open(filename, 'r', encoding='UTF-8')) for line in islice(cmd_file, 1, None): if line[0] != "": cache.append(line[0:6]) for hostread in cache: host = hostread[0] fac = hostread[1] login = hostread[2] user = hostread[3] passw = hostread[4] enpass = hostread[5] ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try: ssh.connect(hostname=host, port=22, username=user, password=passw, timeout=3) except BaseException as e: print('{0} 登录失败,请确认账户密码\网络连通性!!!'.format(host), e, '\n') log = host + '登录失败,请确认账户密码\网络连通性!!!\t' + str(e) + '\t' + str(datetime.now())+'\n' with open(path_before + '/' + 'faillog.txt', "a+", encoding='UTF-8') as rfile: rfile.write(log) continue channel = ssh.invoke_shell() print(host + ' ' + 'collecting' + ' ' + 'information' + ' ' + 'start') collect_info() format_txt() print('信息采集完成,保存路径:{0}/'.format(file_save_path)) input('按任意键返回菜单') def collect_info(): cmd = '' global cmdread channel.send(chr(13)) cmd_file = csv.reader(open(filename, 'r', encoding='UTF-8')) if fac == "Cisco_IOS": # 如果dev_ins.csv表格中enpass列不为空则输入enable密码 if enpass: channel.send('enable' + chr(13)) channel.send(enpass + chr(13)) for cmdread in islice(cmd_file, 1, None): # 读取表格中的命令到cmd变量 cmd += cmdread[8] + chr(10) ssh_exec(cmd) # 执行cmd命令 if fac == "H3C_SW": for cmdread in islice(cmd_file, 1, None): cmd += cmdread[9] + chr(10) ssh_exec(cmd) if fac == "H3C_FW": for cmdread in islice(cmd_file, 1, None): cmd += cmdread[10] + chr(10) ssh_exec(cmd) ssh.close() return def ssh_exec(cmd): global file_save_path if flage == 1: file_save_path = path_before elif flage == 2: file_save_path = path_after if cmd: channel.send(cmd) time.sleep(interval_time) # 执行cmd命令后等待结果输出,时间间隔设置太短会导致输出不完整, # 程序就结束了,通常配置不是太多的情况下3S内即可 output = channel.recv(4096000).decode(encoding='gbk') # 输出结果里面包含中文,编码用的gbk if cmdread[8] not in output: # 检查最后一条命令是否在输出结果里面 print('采集配置不完整,请调大间隔时间后重试。') sys.exit() with open(file_save_path + '/' + host + ".txt", "a+", encoding='gbk') as rfile: rfile.write(output) print(host + ' ' + 'collecting' + ' ' + 'information' + ' ' + 'stop' + '\n') def set_interval_time(): global interval_time fla = 0 interval_time = 2 answer = input('是否修改命令执行间隔时间(默认为{0}s)y/n:'.format(interval_time)) while True: if fla == 1: break if answer == 'y': try: interval_time = float(input('请输入间隔时间(s):')) except BaseException: print('输入有误,请重新输入!') else: fla = 1 else: fla = 1 return def format_txt(): # 输出结果空行太多,将空行去掉 global comp_path if flage == 1: comp_path = path_before elif flage == 2: comp_path = path_after with open(comp_path + '/' + host + '.txt', "r", encoding='gbk') as rfile: line = rfile.read() line = line.replace('\n', '&') with open(comp_path + '/' + host + '.txt', "w", encoding='gbk') as dfile: line = line.replace('&&&', '\n') line = line.replace('&&', '\n') line = line.replace('&', '\n') dfile.write(line) return def comp_read_file(file_name): try: file_desc = open(file_name, 'r', encoding='utf-8') text = file_desc.read().splitlines() file_desc.close() return text except IOError as error: print('文件读取失败,请确认文件是否存在!{0}'.format(error)) input('按任意键退出。') sys.exit() def comp_file(file1, file2): global result_fiel if file1 == "" or file2 == "": print('文件不存在,请先收集信息!\n第一个文件的路径:{0}, 第二个文件的路径:{1} .'.format(file1, file2)) sys.exit() text1_lines = comp_read_file(file1) text2_lines = comp_read_file(file2) diff = difflib.HtmlDiff() result = diff.make_file(text1_lines, text2_lines, context=True, charset='utf-8') today = str(date.today()) result_fiel = path_compare + comp_ip + '-' + today + '.html' try: with open(path_compare + '/' + comp_ip + '-' + today + '.html', 'w', encoding='UTF-8') as result_file: # 输出检查结果 result_file.write(result) except IOError as error: print('写入错误:{0}'.format(error)) def compare(): cache = [] global comp_ip host_file = csv.reader(open(filename, 'r', encoding='UTF-8')) for line in islice(host_file, 1, None): if line[0] != "": cache.append(line[0]) print("开始对比设备信息....") for comp_ip in cache: today = str(date.today()) path_today = 'C:/check/collect_info/' + today + '/' file_before = path_today + 'before' + '/' + comp_ip + '.txt' file_after = path_today + 'after' + '/' + comp_ip + '.txt' print('正在检查:' + comp_ip) print('文件1名称:' + file_before) print('文件2名称:' + file_after + '\n') comp_file(file_before, file_after) print("对比完毕,结果见附件:{0}".format(result_fiel)) input('按任意键返回菜单') def menu(): print('==============网络设备基线配置采集===============') print('(请确认需要采集设备信息是否已导入dev_ins.csv表格)') print('1、实施前设备配置、状态信息采集') print('2、实施后设备配置、状态信息采集') print('3、对比实施前后设备配置、状态') print('0、退出') print('==============================================') def main(): check_path() global flage while True: menu() try: flage = int(input('请选择:')) except BaseException: continue if flage == 1 or flage == 2: ssh_login() elif flage == 3: compare() elif flage == 0: break else: continue return if __name__ == "__main__": main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。