当前位置:   article > 正文

基于Kafka的nginx日志收集分析与监控平台

基于Kafka的nginx日志收集分析与监控平台

文章目录

1、项目环境

搭建此次项目的主要用到的技术软件:centos7(三台)、nginx 、kafka、zookeeper 、mysql、celery、filebeat 、python。

2、项目描述

项目主要是关于使用filebeat作为生产者收集前端nginx集群中用户访问nginx web页面时产生的access日志,然后将收集到的日志信息统一存入kafka平台,然后编写python消费者程序对获取到的nginx日志做清洗,解析出日志中ip所在的省份、运营商、带宽等,获取流量信息之后存入mysql数据库中;并通过设置阈值,达到每分钟监控报警。

3、项目目标

主要是防止某个省份的某个运营商的流量突然增大,导致服务器异常,起到监控预警的作用。

4、项目步骤

4.1 项目框架在这里插入图片描述在这里插入图片描述

4.1.1 项目总体流程

分为几个部分:用户访问——>nginx负载均衡——>应用集群(nginx web服务)——>kafka集群(基于zookeeper)——>python消费者程序——>存入mysql数据库

4.1.2 项目详细流程

总体详细流程说明(以个人理解的,如有理解错误的地方请大家帮我指出):
1、用户访问:测试用户通过域名或者代理集群中的某个ip地址来访问web集群中web服务。

2、代理集群:在用户输入www.sc.com之后,dns会给用户一个负载均衡,让用户随机访问到这两台负载均衡器上的任意一台上。这两个负载均衡器都会帮助做代理转发(按照某种算法进行转发),转发到这三台 nginx web服务器上任意一台。
在使用的两个服务器做负载均衡上安装keepalived实现高可用,定义这两台机器互为主备,实现双vip功能。
**算法指的是:**1、轮询 2、加权轮询 3、ip_hash
依据ip分配方式,指定负载均衡器按照基于客户端IP分配方式,这个方法确保了相同的客户端(ip)请求一致发送到相同的服务器,以保证session会话。这样每个访客都固定访问一个后端服务器,可以解决session不能跨服务器的问题。
4、least_hash:最少连接数,哪个机器连接数少就分发给哪个机器。
5、url_hash:按照访问的url的hash结果分配请求,是每个url定向到同一后端服务器上。
6、hash关键值:hash自定义的key。

负载均衡和高可用性是2个概念:
负载均衡:是将很多的请求分散到后端很多的服务器上,用来化解压力的一个软件。
高可用:有2个或者多个服务器(人)做相同的事情,互相备份。
keepalived高可用详解

3、web应用集群:这个时候web集群中的服务器上就会产生一个日志,然后我们会在这个nginx web服务器上安装filebeat,filebeat相当于一个生产者的角色,用来收集nginx web服务器上产生的日志。

4、kafka集群:filebeta收集到日志之后,会将数据发送到kakfa集群,filebeat会去请求kafka集群中的任意一个broker(创建了topic)。因为如果交互的那个broker里面不是partition的leader,那么follower会返回当前请求副本leader的信息,然后filebeat再跟leader交互。这里所有的partition都加了同一个名为nginxlog的topic中。filebeat将数据随机发送给一个partition中的leader之后,(partition中的ISR列表中的会将leader中的数据做一个同步,同步到其他follower上),然后leaer会返回一个ack信息,ack默认为1,leader收到消息就给生产者发送响应,然后filebeat就会接着发送下一条数据,从而保证了发送数据的一致性。

5、zookeeper:是用来管理kafka的,用来保存kafka的元信息、topic、partition、副本信息,通过抢占的方式选举出kafka的controller,这个controller用来管理kakfa副本partition中的leader和follower的选举。而zookeeper本身也是有leader和follower的,它的选举方式是采用一致性算法(zab),根据少数服从多数,票数过半的原则来选举leader。所以在zk集群中,机器存活数必须过半,集群才能正常使用,所以我们通常也会将zk集群的节点数设为奇数个,这是为了方便选举。跟filebeat给Kafka传递数据一样,Kafka连接任意一台zookeeper都可以操作,但是数据新增修改等事务操作必须在leader上运行。

扩展:客户端如果连接到follower上进行事务操作,follower会返回给leader的IP,最终客户端还是在leader上操作,但是可以直接连接follower进行查询操作。

5、消费者:然后我们需要创建一个python消费者程序(使用了pymysql模块)来消费数据(所以是不存在消费组的。如果有消费组的话,在同一个消费组,里面的消费者同一时刻只能消费一个partition的数量。一个消费组里面的每一个消费者可以同时去消费一个topic里面的不同分区里的数据。),获取到kahka里面的数据之后,对数据进行清洗,清洗成我们需要的IP地址、时间、带宽等字段,然后再通过一个淘宝接口,将IP地址解析成省份和运营商。
6、**mysql数据块:**最后再通过一个淘宝的接口和pymysql模块存入数据库中。

5、项目涉及知识详解

5.1、用户

在进行测试时,用户直接访问nginx web也可以,绑定域名访问。例如:www.sc.com这个域名的解析放在Windows的host文件里面,客户机需要使用的。

dns(域名)也可以做负载均衡,就是通过域名(例如www.sc.com)也可以解析成多个ip地址,以轮询的方式去解析各个ip。但是如果一个服务器挂了,dns不会立马将这个地址去掉,还会继续解析成挂掉的ip,可能会造成访问失败。虽然客服端有重试,但是还是会影响用户体验。

5.2 、代理集群

1、在应用集群 nginx web服务前面加反向代理。安全性也会高一点。负载均衡控制也会容易很多。
2、反向代理机中使用keepalived双vip互为主备做高可用,提高资源利用率。

代理集群:www.baidu.com 解析成两个虚拟ip:一个域名可以解析成两个虚拟ip。
假如:当上面一台一号代理机1.5master挂掉之后,下面二号代理机的1.5backup能运行,下一台二号代理机的1.6master也能正常运行;如果二号代理机1.6master挂掉了,也能使用一号代理机上1.6backup进行访问。

5.2.1、反向代理

1、什么是反向代理呢?代理的是什么?

反向代理代理的是服务器
正向代理:代理的是客户机(VPN)。
反向代理是充当Web服务器网关的代理服务器。当用户将请求发送到使用反向代理的Web服务器时,他们将先转到反向代理,由该代理将确定是将其路由到Web服务器还是将其阻止。这意味着有了反向代理,我们永远不会与使用它的Web服务器进行直接通信。可以将它们看作web服务器或服务器集群的某种包装器。通过负载平衡和缓存,它们可以保护web免遭攻击,并提供更好的web性能。

2、反向代理是如何工作的?

反向代理完全是服务器端工程师的工作。反向代理它充当服务器的网关,除了通过它们,没有请求能够到达使用它们的服务器。就像用户需要隐私保护,而服务器也是如此的。但是,除了可以防御攻击之外,有些还可以利用反向代理通过负载平衡和缓存来提高其Web性能。

对于使用这种类型代理的服务器或一组服务器。我们将不知道有关实际服务器的任何信息。但是可以知道反向代理的ip地址。当用户使用反向代理向web服务发送请求的时候,用户的请求将会直接发送到这个反向代理服务器上面。
然后,代理会决定如何处理该请求,是将其中继到适当的服务器还是发回禁止的错误响应。如果它授权用户的请求,则它将保存用户的请求副本(包括IP地址),并将用户的请求中继到适当的服务器。

当响应被发送回时,它被发送到反向代理,反向代理将其转发给用户。所有这些都是在短时间内发生的,用户甚至都不会注意到。某些反向代理实际上可以缓存资源,这意味着,如果用户正在寻找的信息已经在缓存中,它不必浪费资源,试图再次获取它,它会直接发送给用户,而不打扰服务器。

3、反向代理服务器端技术

反向代理服务器可以保护 nginx web服务免受各种类型的连接的侵害,或用作过滤、防火墙或其他安全性。

反向代理的3种必要应用程序之一——负载均衡,人们使用反向代理的最重要原因是负载平衡。

高流量的网站通常每分钟处理大量请求,这可能会减慢其系统性能并破坏响应时间。为了确保更好的用户体验和更快的响应,于是就使用了反向代理。它们将具有一组服务器,它们全部执行相同的功能。它没有使一台服务器响应所有请求,而是在服务器之间分配了请求,从而缩短了响应时间,并且不会使任何服务器工作过度。

5.2.2 负载均衡

代理集群中可用dns或者nginx反向代理做负载均衡。
在负载均衡中,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器。

1、什么是负载均衡?

负载均衡是高可用网络基础架构的关键组件,通常用于将工作负载分布到多个服务器来提高网站、应用、数据库或其他服务的性能和可靠性。

2、为什么使用负载均衡?

①一个没有负载均衡的 web 架构类似于下图:
在这里插入图片描述
在流程图中,我们可以看到用户是直接连接到web服务器,如果这个服务器宕机了,那么用户将无法访问到web服务器。此外,如果同时有很多用户试图访问web服务器,超过了其能处理的极限,就会出现加载速度缓慢或根本无法连接的情况。

②有负载均衡的web框架图如下所示:
在这里插入图片描述通过在后端引入一个负载均衡器和至少一个额外的 web 服务器,就可以缓解这个故障。通常情况下,所有的后端服务器会保证提供相同的内容,以便用户无论哪个服务器响应,都能收到一致的内容。
为解决负载均衡器的单点故障问题,可以将第二个负载均衡器连接到第一个上,从而形成一个集群。这样的话当主负载均衡器发生了故障,就需要将用户请求转到第二个负载均衡器。因为 DNS 更改通常会较长的时间才能生效,因此需要能灵活解决 IP 地址重新映射的方法,比如浮动 IP(floating IP)。这样域名可以保持和相同的 IP 相关联,而 IP 本身则能在服务器之间移动。

负载均衡器可以处理HTTP、HTTPS、TCP、UDP四种请求。

5.3 应用集群(nginx web服务)

应用集群:nginx web服务:仅仅做静态页面展示。在应用web前面加反向代理。安全性也会高一点。负载均衡控制也会容易很多。

用户通过域名或者代理集群中的某个ip地址来访问web集群中web服务的时候会产生nginx访问日志,日志存放在/var/log/nginx/sc_access,然后我们会在服务器上安装filebeat,filebeat相当于一个生产者的角色,用来收集nginx web服务器上的日志。

1、什么是filebeat?filebeat的工作原理是什么?

Filebeat是一个本地文件的日志数据采集器。在服务器上安装客户端后,filebeat会监控日志目录或者指定的日志文件,tail file,追踪读取这些文件(追踪文件的变化,不停的读),并且转发这些信息到MQ中间件,或者直接到elasticsearch或者logstarsh中存放,转发到kafka中进行索引 。

2、filebeat工作流程:

当开启Filebeat程序的时候,它会启动一个或多个探测器去检测指定的日志目录或文件。对于探测器找出的每一个日志文件,Filebeat会启动收集进程,每一个收集进程读取一个日志文件的内容,然后将这些日志数据发送到后台处理程序,后台处理程序会集合这些事件,最后发送集合的数据到output指定的目的地。

3、filebeat如何获取nginx日志信息?

根据filebeat的yaml格式文件的配置:数据和输入和输出来获取信息和发送信息:

#ymal格式
{
“filebeat.inputs”: [ #输入
{ “type”:“log”,
“enabled”:true,
“paths”:[“/var/log/nginx/sc/access.log” #这里在下面做了修改
},
],
}
对filebeat的/etc/filebeat/filebeat.yml的配置文件进行修改。
在这里插入图片描述
其中输入(input)为:nginx日志的存放路径(也就是在获取nginx上的日志信息):/var/log/nginx/sc/access.log;
输出(output)为:kafka的同一个topic中(三台安装了kafka的机器都需要配置好各自的ip地址)。
output只支持一个输出。
nginx 的web中的日志只需要吐一次,将数据吐到kafka中,不需要去filebeat里面,如果需要,直接去kafka里面去拿,而且filebeat中只支持一个输出(output)。如果在/etc/filebeat下文件写入多个output会导致服务重启不了。

4、为什么选择filebeat?

日志采集器有很多,比如Logstash,虽然Logstash的功能强大,但是它依赖java并且在数据量大的时候进程会消耗过多的系统资源,会严重影响业务系统的性能。

而filebeat就是一个完美的替代者,它基于Go语言没有任何依赖,配置文件简单。同时,filebeat比logstash更加轻量级,所以占用系统资源极少,非常适合安装在生产机器上。

Filebeat可以直接将数据发送到Elasticsearch、Kafka或者Redis,在那里可以进一步处理和增强数据,然后在Kibana中将其可视化,目前来说Filebeat是 ELK 日志系统在Agent上的第一选择。

5.4 kafka集群

5.4.1 什么是kafka?

Kafka是是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等。
kafka是一个消息中间件。
kafka通常用在:日志收集、业务解耦、流量削峰。

kafka的主要应用场景是:日志收集系统和消息系统。

5.4.2 什么是消息中间件?

面向消息的系统(消息中间件)是在分布式系统中完成消息的发送和接收的基础软件。消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。

消息中间件:是两种通信模式:点对点、发布订阅(kafka)。
kafka是消息中间件中的一种。kafka支持的通信方式:发布—订阅。

5.4.3 使用kafka做日志统一收集的原因

1、故障发生时方便定位问题
2、日志集中管理,后续需要日志的程序需要直接懂kafka获取日志即可。
3、尽可能的减少日志处理对nginx的影响。

kafka的日志在/opt/kafka_2.12-2.8.1/logs下,主要看server.log

5.4.4 kafka基本架构

请添加图片描述
从上图中,producer就是生产者,是数据的入口。图中的producer在写入数据的时候永远在找leader,不会直接将数据写入follower。
消息写入leader之后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!

5.4.5 kafka相关概念

Producer:生产者,消息的产生者,是消息的入口。
Broker:broker是kafka实例,每个服务器上有一个或者多个kafka的实例,我们暂时认为每个broker对应一台服务器,每个kafka集群内的broker都有一个不重复的编号,如:broker1、broker2、broker3。
Topic:消息的主题,可以理解为消息的分类,kafka的数据保存在topic。在每个broker上都可以创建多个topic。
Partition:topic的分区,每个topic可以有一个或者多个分区(partition),partition是比较具体的东西,partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、log文件、.timeindex文件三个文件,log文件就是实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同分区的数据是不重复的,partition的表现形式就是一个一个的文件夹。
在这里插入图片描述
图中,这个的partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的,(每条的message大小不一致)。文件的命名是以该segment最小offset来命令的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

Segment:partition物理上由多个segment组成,Partition的结构:Partition在服务器上的表现形式就是一个一个的文件夹,每个Partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件这三个文件,log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

Replication:每个分区都有多个副本,副本的作用就是做备胎。当主分区(leader)故障的时候会选择一个备胎(follower)上位,成为leader。在kafka中默认副本的最大数量是10个,并且副本的数量不能大于一个副本(包括自己)。

kafka的副本机制每个主题上会划分成若干个分区。副本就是在分区层级下定义的,每个分区都会配置若干个副本。在同一个分区下的所有副本保存有相同的消息序列,这些副本保存在不同的broker上,能够对抗部分broker宕机带来的数据不可用。

Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量。
Zookeeper:kafka集群依赖zookeeper来保存集群的元信息,来保证系统的可用性。

5.4.6 kafka工作流程分析

1、发送数据:

在这里插入图片描述
从上图中,producer就是生产者,是数据的入口。图中的producer在写入数据的时候永远在找leader,不会直接将数据写入follower。
消息写入leader之后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的。

2、写入数据示意图如下:

在这里插入图片描述
数据会写入到不同的分区,那kafka为什么要做分区:
分区的主要目的是:

方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松应对日益增长的数据量。

提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

3、保存数据:

producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

5.4.6 kafka如何保证高可用?

多个broker +多个partition +多个 replica

5.4.7 kafka如何保证数据一致性?

如何保证消费者消费数据一致性:消费者可以连接到任何一台机器,机器会告诉它leader在哪里,如何通过leader进行外界交互。
同一个消费组,里面的消费者同一时刻只能消费一个partition的数量。
一个消费组里面的每一个消费者可以同时去消费一个topic里面的不同分区里的数据。

针对topic来说。只要能连接上这个kafka,消费者都可以进行消费,只要有权限获取到topic,消费者之间的消费互不影响。

1、producer可以通过request.required.acks设置。
① 生产者一致性:
保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,为了保证消息不丢失,就可以通过ACK回答机制。在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,ack参数可以设置为1、1、-1
acks =0 ,生产者不会等待任何来自服务器的响应。生产者不需要接收响应,发完就发下一条,不确保消息是否发送成功。安全性最低但是效率最高。
ack1=1 (默认值):leader收到就会发送响应给生产者,只需要确保leader发送成功,再发送下一条。
acks=-1 (ISR),等待ISR列表中的每一个副本都接收到,才给生产者响应。生产者才会接收到来自服务器的成功响应,这个模式是最安全的,但是效率很低。
acks=2,除了一个leader收到,还需要一个flower收到。
ack= n,除了一个leader收到,还需要等到n-1个follower收到。
ack= all,代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

2、消费者消费数据时,引入了High Water Mark机制、木桶效应,只能消费ISR列表里偏移量最小的副本的消息数量。

5.4.8 kafka中怎样才能知道消费者消费到哪一步了?

1、使用offset偏移量:记录消费者消费偏移量。
本来offset保存在本地,但是现在保存在kafka的**__consumer_offsets**中主题中,每次消费每隔几秒就会写入,需要的时候,就会自动去kafka的主题里面拿。
也阔以自己写一个文档,里面的写入你上次消费的记录。
消费者消费的时候,会记录自己的消费偏移量,消费偏移量可以自己保存在本地,也可以提交到kafka的_consumer_offsets主题里的保存。

bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.152:9092 --topic nginxlog --from-beginning
执行kafka-console-consumer.sh的时候,就会自动保存到kafka主题中。

[root@nginx-kafka01 opt]# cd /data
[root@nginx-kafka01 data]# ls
cleaner-offset-checkpoint  __consumer_offsets-30  log-start-offset-checkpoint
__consumer_offsets-0       __consumer_offsets-33  meta.properties
__consumer_offsets-12      __consumer_offsets-36  nginxlog-0
__consumer_offsets-15      __consumer_offsets-39  recovery-point-offset-checkpoint
__consumer_offsets-18      __consumer_offsets-42  replication-offset-checkpoint
__consumer_offsets-21      __consumer_offsets-45  replication-offset-checkpoint.tmp
__consumer_offsets-24      __consumer_offsets-48  sc-0
__consumer_offsets-27      __consumer_offsets-6   sc-1
__consumer_offsets-3       __consumer_offsets-9   sc-2
[root@nginx-kafka01 data]# cd nginxlog-0/
[root@nginx-kafka01 nginxlog-0]# ls
00000000000000000000.index  00000000000000000000.timeindex
00000000000000000000.log    leader-epoch-checkpoint
index记录每个offset所对应的的每个位置,是一个二进制文件,真正的是存放在多个log文件中。
发送过来的数据都存放在log文件中。
/data里面存放kafka数据的位置:
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

数据的存储目录:
文件夹:<topic_name>-<分区号>
每一个partition的数据都是由很多个segment存储,每一个segment(段)。每一个segment由一个index(索引)和log文件组成。分出多个segment:段便于做数据清理。

存放在多个log文件里面的原因:
方便kahka清除数据,按时间或者大小存储,不会永久保存,定期保存,默认保存七天,可以进行修改。修改配置文件为:/opt/kafka_2.12-2.8.1/config

kafka可以按照两个维度清理数据:
1、按照大小
2、按时间
任意一个条件满足,都可以触发日志清理。

5.5 zookeeper

5.5.1 什么是zookeeper?

zookeeper是分布式应用协调管理服务、配置管理、域名管理、分布式数据管理、集群管理。

5.5.2 zookeeper的工作原理

zookeeper架构图:
在这里插入图片描述
zk集群中,节点存活数必须过半,集群才能正常使用。一般集群的节点数都设置为奇数,方便选举,而且容错率一样。
如果不过半容易脑裂。就比如一个大集群,里面有5个节点,zk1-5,里面存活的数必须过半,进行选举,所以节点里面一般以奇数。如果zk4和zk5坏掉了,至少也得有前三台都存活。

什么是脑裂?脑裂(split -brain)就是“大脑分裂”,也就是本来一个“大脑”被拆分成了两个或多个“大脑”,我们都知道,如果一个人有多个大脑,如何是相互独立,就会导致人体“手舞足蹈,不听使唤”。
脑裂通常会出现在集群环境中,比如ElasticSearch、Zookeeper集群,而这些集群环境有一个统一的特点,就是它们有一个大脑,比如ElasticSearch集群中有一个Master节点,Zookkeeper集群中有Leader节点。

Zookeeper集群中的脑裂场景
对于一个集群,想要提高这个集群的可用性,通常会采用多机房部署,比如现在有一个由6台zkServer所组成的一个集群,部署在两个机房里面:在这里插入图片描述
正常情况下,这个集群中只会有一个leader,那么如果机房之间的网络断掉之后,两个机房内的zkServer还是可以相互通信的,如果不考虑过半机制的话,那么就会有出现每个机房内部都将会选出一个Leader。
在这里插入图片描述
对于这种情况的话,我们可以看得出来,原本应该是统一的一个集群对外提供服务的,但是现在变成了两个集群同时对外提供服务,如果过了一会,断了的网络突然联通了,那么此时就会出现问题了,两个集群刚刚都对外提供服务了,就会出现数据应该怎么合并,数据冲突怎么解决等问题。
刚刚说到脑裂场景的时候,有一个前提条件就是没有考虑过半机制,所以实际上Zookeeper集群中是不会出现脑裂问题的,而不会出现的原因就跟过半机制相关。

过半机制

在领导者选举的过程中,如果某台zkServer获得了超过半数的选票,则以zkServer就可以成为Lerder了。
过半机制的源码实现是通过下面操作:

public class QuorumMaj implements QuorumVerifier {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumMaj.class);
    
    int half;   
    // n表示集群中zkServer的个数(准确的说是参与者的个数,参与者不包括观察者节点)
    public QuorumMaj(int n){
        this.half = n/2;
    }
    // 验证是否符合过半机制
    public boolean containsQuorum(Set<Long> set){
        // half是在构造方法里赋值的
        // set.size()表示某台zkServer获得的票数
        return (set.size() > half);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

核心代码部分为:

this.half = n/2;
return (set.size() > half);

就比如:如果现在集群中有5台zkServer,那么half=5/2=2,也就是说,领导者选举的过程中至少有三台zkServer投了同一个zkServer,才会符号过半机制,才会选举出一个Leader。
那为什么选举的过程中为什么一定要有一个过半机制验证呢?
因为这样不需要等待所有的zkServer都投了同一个zkServer就可以选举出来一个Leader了,这样比较快,所以叫快速领导者选举算法。
那么过半机制中为什么是大于,而不是大于等于?
这就跟脑裂问题有关系了,就比如回到上面出现脑裂问题的场景:
当机房中间的网络断掉之后,机房1台内的三台服务器会进行领导者选举,但是此时过半机制的条件是set.size()>3,也就是说至少要4台zkServer才能选举出来一个Leader,所以对于机房1来说它不能选出一个Leader,同样机房2也不能选出一个Leader,这种情况下整个集群当机房间的网络断掉之后,整个集群都将没有leader。
而如果过半机制的条件是set.size()>=3,那么机房1和机房2都会选出一个leader,这样就出现了脑裂。所以我们就知道了。为什么过半机制是大于,而不是大于等于。就是为了防止脑裂。

如果假设我们现在只有5台机器,也部署在两个机房:
此时过半机制的条件是set.size()>2,也就是至少要3台服务器才能选出一个leader,此时机房之间的网络断开了,对于机房1来说是没有影响的,Leader依然还是Leader,对于机房2来说是选不出来Leader的,此时整个集群中只有一个Leader。
所以,我们可以总结得出,有了过半机制,对于一个Zookeeper集群,那么没有Leader,要么只有一个leader,这样就避免了脑裂问题。

写的话是通过leader。读的话leader和follower都可以。
可以直接连接follower进行查询操作。
follower:查询、选举。选举是少数服从多数。

5.5.3 zookeeper与kakfa的联系

zookeeper是用来管理kafka的,用来保存kafka的元信息、topic、partition、副本信息,通过抢占的方式选举出kafka的controller,这个controller用来管理kakfa副本partition中的leader和follower的选举。

而zookeeper本身也是有leader和follower的,它的选举方式是采用一致性算法(zab),根据少数服从多数,票数过半的原则来选举leader。

所以在zk集群中,机器存活数必须过半,集群才能正常使用,所以我们通常也会将zk集群的节点数设为奇数个,这是为了方便选举。

跟filebeat给Kafka传递数据一样,Kafka连接任意一台zookeeper都可以操作,但是数据新增修改等事务操作必须在leader上运行。

客户端连接任意一台zk都可以操作:但是数据新增修改等事物操作必须在leader上运行。客户端如果连接到follower上进行事务操作,follower会返回给leader的IP,最终客户端还是在leader上操作,但是可以直接连接follower进行查询操作。

follower:查询、选举。选举是少数服从多数。
zookeeper中数据的同步,只要过半节点同步完成,就表示数据已经提交。(commit)
zookeeper不是强一致性的。它属于最终一致性。
zookeeper里面存放的数据很少,
每个节点存储的数据量默认不会超过1M。存储速度快。
zookeeper相当于一个文件树。

5.7 redis(扩展):数据库结构服务器

redis通常被称为数据库结构服务器,Redis不仅仅支持简单的key-value类型的数据,同时还提供list、set、zset、hash等数据结构的存储。
有丰富的数据类型—Redis支持二进制案例的Strings, Lists, Hashes, Sets 及 Ordered Sets 数据类型操作。
不同的类型执行不同的指令:list:lpush。

memcached vs redis:缓存

redis:常用于缓存里面。
redis是一种键值存储的数据库。

redis开启持久化的两种模式

AOF 全持久化模式 每一次操作都会同步到磁盘(同步的是每次操作日志到磁盘),安全,但是效率低。
RDB 半持久化模式 定时的将内存内容快照写入磁盘(将内容备份写入磁盘,定时写一次)

为什么用kafka做日志收集而不用redis?

1、kafka适用于做日志收集平台。
而redis大多数使用在做kv数据存储上。
2、redis也有一个queue的数据类型,用来做发布订阅系统的。redis只有单一的消费者,不能分成多个消费组。它里面的消息都只能消费一次。
kafka可以通过offset的设置来重复消费的。
3、redis数据是存储在内存里面的,kafka是存储在磁盘里面的。

redis的安装配置

1、先安装redis

yum  install epel-release -y
yum install redis -y
  • 1
  • 2

2、修改监听的ip:

vim /etc/redis.conf
bind 0.0.0.0
  • 1
  • 2

3、启动redis

[root@nginx-kafka01 ~]# systemctl start redis
  • 1

默认端口:6379
查看端口:lsof -i:6379
或者:netstat -anplut|grep redis

redis数据库 0-15号库:
4、进入redis:

[root@nginx-kafka01 ~]# redis-cli
127.0.0.1:6379> select 0  选择0 1 2 3号库
OK
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> select 2
OK
127.0.0.1:6379[2]> select 3
OK
127.0.0.1:6379[3]> set a 123
OK
127.0.0.1:6379[3]> get a  设置key
"123"
 127.0.0.1:6379[3]> keys a*  获取以a开头的key
1) "a"
127.0.0.1:6379[3]> set abc 444 设置key
OK
127.0.0.1:6379[3]> set aaa 555
OK
127.0.0.1:6379[3]> set ddd 666
OK
127.0.0.1:6379[3]> keys a* 
1) "abc"
2) "a"
3) "aaa"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

列表:使用lpush:

127.0.0.1:6379[3]> lpush sanchuang call
(integer) 1
127.0.0.1:6379[3]> lpush sanchuang 1
(integer) 2
127.0.0.1:6379[3]> lpush sanchuang nnn
(integer) 3
127.0.0.1:6379[3]> lpush sanchuang yyy
(integer) 4
127.0.0.1:6379[3]> lrange sanchuang 0 10
1) "yyy"
2) "nnn"
3) "1"
4) "call"
127.0.0.1:6379[3]> rpop  sanchuang   #移除并获取列表的第一个元素。
"call"
127.0.0.1:6379[3]> lrange sanchuang 0 10  #获取列表指定范围内的元素:获取列表0到10内的元素。
1) "yyy"
2) "nnn"
3) "1"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -

5.8 python消费者程序

编写的python程序代码存放在gitee中:

python_consumer.py

5.9 mysql 数据库

数据存入mysql中的,需要先创建好mysql数据库:

1、需求分析
需要nginx日志的ip,时间,带宽字段
将ip字段解析成相应的省份、运营商
存入数据库的字段: id, 时间, 省份, 运营商, 带宽
带宽:通过nginx.log去获取。
通过淘宝接口:
使用pykafka模块。

#步骤
1、创建数据表
2、编写python脚本, 从kafka获取nginx日志
3、获取好的nginx日志,提取出ip,时间,带宽字段
4、提取出的ip字段通过淘宝的一个接口解析出省份和运营商。
url = “https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip=114.114.114.114”
5、格式化时间字段 “2021-10-12 12:00:00”
6、存入数据库

#创建表
create table nginxlog (
id  int primary key auto_increment,
dt  datetime not null,
prov varchar(256) ,
isp  varchar(256),
bd  float
) CHARSET=utf8;

create table prov_index(
id  int primary key auto_increment,
prov_name  varchar(256)
) charset=utf8;

create table isp_index(
id int primary key auto_increment,
isp_name varchar(256)
) charset=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
python程序:
import json
import requests
import time
import pymysql

taobao_url = "https://ip.taobao.com/outGetIpInfo?accessKey=alibaba-inc&ip="

db = pymysql.connect(host="192.168.2.152", user="sc", password="123456", port=3306, db="sanchuang", charset="utf8")
cursor = db.cursor()

# 查询ip地址的信息(省份和运营商isp),通过taobao网的接口
def resolv_ip(ip):
    response = requests.get(taobao_url + ip)
    if response.status_code == 200:
        tmp_dict = json.loads(response.text)
        prov = tmp_dict["data"]["region"]
        isp = tmp_dict["data"]["isp"]
        return prov, isp
    return None, None

# 将日志里读取的格式转换为我们指定的格式
def trans_time(dt):
    # 把字符串转成时间格式
    timeArray = time.strptime(dt, "%d/%b/%Y:%H:%M:%S")
    # timeStamp = int(time.mktime(timeArray))
    # 把时间格式转成字符串
    new_time = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
    return new_time

# 从kafka里获取数据,清洗为我们需要的ip,时间,带宽
from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.2.152:9092,192.168.2.132:9092,192.168.2.137:9092")
topic = client.topics['nginxlog']
balanced_consumer = topic.get_balanced_consumer(
    consumer_group='testgroup',
    auto_commit_enable=True,
    zookeeper_connect='nginx-kafka01:2181,nginx-kafka02:2181,nginx-kafka03:2181'
)
# consumer = topic.get_simple_consumer()
for message in balanced_consumer:
    if message is not None:
        try:
            line = json.loads(message.value.decode("utf-8"))
            log = line["message"]
            tmp_lst = log.split()
            ip = tmp_lst[0]
            dt = tmp_lst[3].replace("[", "")
            bt = tmp_lst[9]
            dt = trans_time(dt)
            prov, isp = resolv_ip(ip)
            if prov and isp:
                print(ip, prov, isp, dt,bt)
                cursor.execute('insert into nginxlog(dt,prov,isp,bd) values("%s", "%s", "%s", "%s")' % (dt, prov, isp, bt))
                db.commit()
                print("保存成功")
        except ValueError as e:
            print("修改失败", e)
            db.rollback()
db.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

6.0 celery:分布式任务处理模块

celery异步任务:要将监控的任务写入celery_tasks中的scheduled_task任务。指令下发
celery分布式任务处理模块:
任务处理:
1、获取监控项
2、获取数据库日志
3、对比数据库日志和监控项阈值,如果超出范围就发邮件告警。
在这里插入图片描述
1、安装celery:
Python里面连接redis的模块库,python-pymysql- mysql python-pymysql -

pip install  celery  
pip install  redis
  • 1
  • 2

2、生产者生产任务:celery_app:核心对象。执行的是定时任务。
#编辑celery 参照flask_log/celery_app
3、config.py:celery 的配置文件

from celery.schedules import crontab
  • 1

4、配置消息中间件的地址 #启动worker的地址

BROKER_URL = "redis://192.168.77.152:6379/1"
  • 1

5、启动celery时,导入任务,只要导入任务才能执行

CELERY_IMPORTS = {
      'celery_tasks'   #最开始时候是没有任务的,存放celery要执行的任务
}
  • 1
  • 2
  • 3

6、时区

CELERY_TIMEZONE = "Asia/Shanghai"
  • 1

7、设置了一个定时任务

CELERYBEAT_SCHEDULE = {
   'log-every-minute': {
     'task' : 'celery_tasks.scheduled_task',    #是celery_tasks中的scheduled_task任务,千万不能打错
     'schedule': crontab(minute='*/1')   
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

8、celery_tasks.py:存放任务的文件

`from app import celery_app
  • 1
 @celery_app.task
 def scheduled_task(*args,**kwargs):  
    print("this is scheduled_task")
  • 1
  • 2
  • 3

加入装饰器就是一个任务,不加装饰器就是一个普通对象。

9、app.py:核心对象,生成celery的核心对象;存放celery核心对象的文件

from celery import Celery
  • 1

实例化celery对象,传入一个名字即可

celery_app = Celery('celery_app')
  • 1

从config中获取东西

celery_app.config_from_object('config')
  • 1

启动worker:启动消费者,消费任务。

[root@nginx-kafka01 celery_task]# celery -A app.celery_app worker --loglevel=INFO -n node1
py` to avoid these warnings and to allow a smoother upgrade to Celery 6.0.
[2022-07-25 16:45:51,567: INFO/MainProcess] Connected to redis://192.168.2.152:6379/1
[2022-07-25 16:45:51,574: INFO/MainProcess] mingle: searching for neighbors
[2022-07-25 16:45:52,610: INFO/MainProcess] mingle: all alone
[2022-07-25 16:45:52,645: INFO/MainProcess] celery@nodel ready.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

生成的任务每分钟会打印出来。
先启动beat:

root@nginx-kafka01 celery_task]# celery -A app.celery_app beat --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
__    -    ... __   -        _
LocalTime -> 2022-07-25 17:08:53
  • 1
  • 2
  • 3
  • 4

worker和beat都要执行核心对象文件,将不同的work就有不同的消费者执行任务,消费不过来的时候可以再多加几个worker。
写监控接口:增删改查
10、celery程序,api接口授权。
访问接口实现对数据库的增删改查。
数据库:id ,prov ,lsp、bd、action
接口:/monitor ---- restful的接口规范
状态:100000 —成功。

到这里项目的详解就结束了,如有理解错误的地方,请大家进行批评指正。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/192728
推荐阅读
相关标签
  

闽ICP备14008679号