赞
踩
前端时间忙完了安全漏洞修复相关的工作后可算是闲下来一些了,想着写点什么,正好遇到客户放强推堡垒机,以后VPN这些连接环境的手段就无法使用了,而客户这边的堡垒机访问web页面的体验那真是一言难尽,所以为了能够方便做一些日常的运维、启停的操作,想写一个命令行工具,用来对接ambari,开个坑先,如果内部团队用的好的话,说不定以后再把这个继续做下去:D;
本身就对Python比较熟悉,所以就拿Python来写了,而且在Github看到之前Ambari官方也尝试用Python做一个Ambari的命令行管理项目,不过后来搁浅了,大概是因为命令行去做一些集群变更操作确实比较不好做,像我这种基础的运维需求,命令行应该是完全能够做到的。
首先把想到的需要的功能先列出来,运维角度来说,最主要的肯定是信息的查看:
除此之外,下一步准备增加组件的管理,最基础的当然就是服务启停:
多方查了资料,确定了主要需要使用的三方库:
除此之外肯定还有requests等库,因为都是标准库,就不提了;
当前的连接配置是通过配置文件进行,结构如下:
[global]
# 全局配置,填写ambari的IP信息和连接信息
ambari_ip=1.0.0.1
ambari_port=8080
# 如果ambari开启了https访问,需要对应开启
https_enable=false
username=admin
password=admin
[advance]
# 这一部分是高级配置,我们环境会结合Prometheus进行组件和主机的指标监控
prometheus_url=1.0.0.1
http_enable=false
加载配置
解析配置文件没什么问题,有一个需要注意的点是,如果配置文件中包含一些特殊字符(例如#,configparser.ConfigParser解析器会把#后面的内容当成备注),这个时候就必须使用configparser.RawConfigParser()
# -*- coding:utf-8 -*- #!/usr/bin/python3 # 解析Conf.ini配置文件 import configparser import os class Conf(object): def __init__(self, filename): if not os.path.exists(filename): print("The {file} is not exists. ".format(file=filename)) exit(1) self.filename = filename self.conf = configparser.RawConfigParser() def read_conf(self): self.conf.read(self.filename) def get_all_conf_json(self): d = dict() for s in self.conf.sections(): ky = dict() for i in self.conf.items(s): ky[i[0]] = i[1] d[s] = ky self.conf_json = d def get_the_conf_json(self, section): if "conf_json" in self.__dict__: # 如果已经调用过all_conf_json, 则不需要单独跑了 return self.conf_json[section] ky = dict() for i in self.conf.items(section): ky[i[0]] = i[1] return {section: ky}
首先,这是一个交互式的工具,所以,窗口是一个继承与CMD2的主类:
import cmd2
class AmbariShell(cmd2.Cmd):
def __init__(self, *args, **kwargs):
# 必须执行super().__init__(),否则终端会异常退出
super().__init__(*args, auto_load_commands=False, **kwargs)
人靠衣装马靠鞍,咱接下来要有个好看的门面,也就是启动工具后的欢迎信息,这里不对rich包的用法做太多说明,以后有时间专门开个坑:
def welcome(self) -> None:
msg = Text.assemble(
("Welcome to Ambarishell-Python! \n", "bold green"),
("The tools created by Lijiadong.\n", "green"),
"The current config ambari server url is ",
(self.ambari_url, "bold white underline"),
("\nLogin user name is {username}.\n".format(
username=self.username)),
("If the connection information is correct, execute the 'connect' command to establish the connection.\n", "bold blue"),
("Hive a nice day. :D")
)
panel = Panel(msg, title="AmbariShell Python", width=120, padding=1)
rprint(panel)
Rich的Text.assemble支持将多个tuple拼接起来,tuple第一个元素是文本内容,第二个元素是样式内容;Panel是标签样式,最终的实现结果是这样:
由于该工具必然会有大量get的请求操作,使用requests.session建立会话是很有必要的,防止每次都带上认证的请求头,因此,增加一个connect命令,进入交互界面后,必须手动建立连接,建立连接的同时,会对当前的cookies进行校验,如果没有cookies,则直接连接,连接状态码返回如果不是200,也会提示错误,程序退出;
需要注意的是,cmd2会识别do开头的类方法,比如do_connect,在命令行可以直接输入connect调用这个方法:
def do_connect(self, line):
if 'session' in self.__dict__ and len(self.session.cookies.values()) != 0:
rprint("[blue]The connection has been established and no further operation is required. [/blue]")
return
self.session = requests.Session()
code = self.session.get(urljoin(self.ambari_url, "api/v1/users/admin/authorizations"), auth=(self.username, self.password)).status_code
if code != 200:
rprint("[red]Login to Ambari failed.\nPlease check your connect config or Ambari Server status.\n AmbariShell will exit with code 2. [/red]")
self.exit_code = 2
return True
rprint("[green]Login to Ambari successful.[/green]")
此时连接已经建立了,为了方便我后续其他操作的编写,所以我此处的思路是保存必要信息,比如Ambari上的主机清单、组件清单、服务清单,这些基本都是运维时的固定资产,不会变动,所以可以直接把元数据保存起来:
# 连接成功后,获取必要的信息,存入变量,方便后面的动态调用 req = self.session.get(urljoin(self.ambari_url, "api/v1/clusters?fields=Clusters/version,Clusters/total_hosts")).json()['items'][0]['Clusters'] self.cluster = req['cluster_name'] self.hdp_version = req['version'] self.hosts_num = req['total_hosts'] req = self.session.get(urljoin(self.ambari_url, "api/v1/hosts")).json()['items'] self.services_list=dict() self.hosts_list = [item['Hosts']['host_name'] for item in req] self._detail_hosts = DetailHosts(self.session, self.cluster, self.hosts_list, self.ambari_url) # 用于装载hosts变量 req = self.session.get(urljoin(self.ambari_url, "api/v1/clusters/{cluster}/components".format(cluster=self.cluster))).json()['items'] for item in req: service_name = item['ServiceComponentInfo']['service_name'] component_name = item['ServiceComponentInfo']['component_name'] if service_name not in list(self.services_list.keys()): self.services_list[service_name] = [] self.services_list[service_name].append(component_name)
大功告成,让我运行一下这第一个命令,建立和Ambari的连接;
当第一次建立连接并成功后,会打印绿色的连接成功的信息,如果连接已经建立,会打印蓝色的提示信息,不作任何操作:
如果连接失败,则会直接退出:
上面已经完成了第一个命令——建立连接,接下来编写第一个实际的操作命令——show命令,这个命令将会包含4个用法:
由上可知,show命令应该允许四个固定的参数,在cmd2中要用装饰器添加这些参数,在方法前带上以下的内容,add_argument就是添加了一个type类型的参数,可选值为’clusters’, ‘hosts’, ‘services’, ‘components’,这里的type根据自己的需要去给,并不是必须是type:
show_load_parser = cmd2.Cmd2ArgumentParser()
show_load_parser.add_argument('type', choices=['clusters', 'hosts', 'services', 'components'])
@cmd2.with_argparser(show_load_parser)
@cmd2.with_category("Command Loading")
定义show命令的方法,这里用了几种rich的展示方式,如Table、Console、Panel
def do_show(self, ns: argparse.Namespace): if 'session' not in self.__dict__.keys(): rprint("[red]No session is established. Please execute the 'connect' command to establish the connection.[/red]") return console = Console() if ns.type == 'clusters': # 显示ambari托管的集群列表 table = Table() for column in ['CLUSTER', 'VERSION', "HOST_NUMS"]: table.add_column(column) table.add_row(self.cluster, self.hdp_version, str(self.hosts_num)) console.print(table) elif ns.type == 'hosts': # 显示所有的hosts req = self.session.get(urljoin(self.ambari_url, "api/v1/hosts?fields=Hosts/ip,Hosts/host_name,Hosts/os_type,Hosts/ph_cpu_count,Hosts/total_mem,Hosts/host_state,Hosts/host_status")).json()['items'] host_panel = [Panel(self.hosts_format_to_panel(item['Hosts']), expand=True, box=HEAVY) for item in req] console.print(Columns(host_panel, align='center')) elif ns.type == 'services': # 列出所有service清单 req = self.session.get(urljoin(self.ambari_url, "api/v1/clusters/{cluster}/services?fields=ServiceInfo/state,ServiceInfo/kerberos_enabled,ServiceInfo/maintenance_state".format(cluster=self.cluster))).json()['items'] table = Table() for column in ['SERVICES', 'STATE', 'KERBEROS', 'MAINTENANCE']: table.add_column(column) for item in req: item = item['ServiceInfo'] service = item["service_name"] kerberos = "[bold green]OPEN[/bold green]" if item['kerberos_enabled'] else "[bold]CLOSE[/bold]" state = "" # 判断服务状态 if item['state'] == "STARTED": state = "[bold green]STARTED[/bold green]" elif item['state'] == "INSTALLED": state = "[bold]STOPPED[/bold]" else: state = "[bold red]BAD[/bold red]" maintenance = "[bold green]CLOSED[/bold green]" if item['maintenance_state'] == 'OFF' else "[bold]OPEN[/bold]" table.add_row(service, state, kerberos, maintenance) console.print(table) elif ns.type == 'components': # 以卡片形式列出每个service下的components components = [] for s, c in self.services_list.items(): # p = Panel("\n".join(c), expand=False,title=s,box=HEAVY) # components.append(p) t = Table() t.add_column(s, justify="center") # 居中显示 t.add_row("\n".join(c)) components.append(t) console.print(Columns(components, align='center', equal=True))
集群和主机信息显示:
组件和服务信息:
当前工具已经可以检查集群的基本信息,下一篇文章将会实现detail命令,用来呈现具体节点的操作系统指标或者组件实例指标,这一部分会引入prometheus,获取更多的监控性能指标,丰富工具的功能。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。