当前位置:   article > 正文

ZooKeeper及Kafka学习与ELK+Kafka+Filebeat 集群架构搭建

ZooKeeper及Kafka学习与ELK+Kafka+Filebeat 集群架构搭建

愿你熬过万丈孤独,藏下星辰大海


目录

# 背景分析

# 1.关于 Zookeeper

## 1.1 zookeeper 概念

## 1.2 zookeeper 作用

## 1.3 zookeeper 安装与使用--含启动错误

### 1.3.1 zookeeper下载与安装

#### 1.3.1.1 集群模式环境及安装

#### 1.3.1.2 启动出现问题

### 1.3.2 企业中的一些环境

### 1.3.3 Zookeeper 的数据结构和数据模型

#### 1.3.3.1 Znode 数据节点

#### 1.3.3.2 Znode 节点类型

### 1.3.4 Zookeeper 一致性协议原理

#### 1.3.4.1 Zookeeper 服务端角色

#### 1.3.4.2 一致性协议 -- ZAB

#### 1.3.4.3 Zookeeper Leader -- 领导选举算法

#### 1.3.4.4 FastLeaderElection 选举参数解析

#### 1.3.4.5 崩溃恢复

## 1.4 消息队列

### 1.4.1 消息队列适合场景

### 1.4.2 消息队列主流产品

### 1.4.3 获取数据的推、拉两种方案对比

### 1.4.4 消息队列消费模型

## 2.1 Kafka 基础知识及环境搭建

### 2.1.1 Kafka 介绍

### 2.1.2 Kafka 消息引擎模型

### 2.1.3 kafka 设计分区原因

### 2.1.4 kafka 使用场景

### 2.1.5 kafka 速度快的理由

### 2.1.6 kafka 集群搭建

## 3.1 ELK+Kafka+Filebeat 集群架构

### 3.1.1 环境部署

#### 3.1.1.1 配置 Filebeat 输出到 Kafka 集群

#### 3.1.1.2 Logstash 从 Kafka 读取数据,并输出到 elasticsearch

#### 3.1.1.3 将 Filebeat 采集的日志通过 Logstash 输出到 Elasticsearch 和 Kibana

#### 3.1.1.4 Kibana 操作--192.168.182.131

## 4.1 小结


先装zookeeper再装kafka

 

# 背景分析

        1) 后台系统由集中式发展为分布式

        随着计算机系统的规模及业务量的迅速提升和互联网的爆炸式增长,集中式
系统采用大型主机单机部署产生了一些问题:系统大而复杂、难于维护、发生单点故障引起雪崩、扩展性差等。
        这些问题都使业务面临巨大的压力和严重的风险。分布式系统是一个硬件 或软件组件分布在不同的网络计算机上,彼此之间仅仅通过消息传递进行通信和协调 的系统,可以很好的解决系统扩容、可用性以及降低成本。
        2) 分布式系统架构引入的新问题
        分布式系统架构带来了优点的同时,也提出了一系列的挑 战:
1 )由于多节点甚至多地部署,节点之间的数据一致性如何保证?
2 )在并发场景下如何保证任务只被执行一次?
3 )一个节点挂掉不能提供服务时如何被集群知晓并由其他节点接替任务?
4 )存在资源共享时,资源的安全性和互斥性如何保证?
        3) 分布式协调组件
        为解决分布式系统中面临的问题,开发者通过工程实践创造了很多 分布式系统协调组件,这些组件可以在分布式环境下,保证分布式系统的数据一致性 和容错性等。其中为我们熟知的有: ZooKeeper ETCD Consul 等。
        ZooKeeper 作为 Apache 的顶级开源项目,基于 Google Chubby 开源实现,在 Hadoop Hbase Kafka 等技术中充当核心组件的角色。

# 1.关于 Zookeeper

## 1.1 zookeeper 概念

        ZooKeeper 是一个分布式的,开源的分布式应用程序协调服 务,它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域
名服务、分布式同步、组服务等。 ZooKeeper 允许读取、写入数据和发现数据更新。数据按层次结构组织在文 件系统中,并复制到 ensemble (一个 ZooKeeper 服务器的集合)中所有的 ZooKeeper 服务器。对数据的所有操作都是原子的和顺序一致的。 ZooKeeper 通过 Zab 一致性协议在 ensemble 的所有服务器之间复制一个状态机来确保此特性。

## 1.2 zookeeper 作用

分布式应用程序可以基于 ZooKeeper 实现:
  • 数据的发布和订阅
  • 服务注册与发现
  • 分布式配置中心
  • 命名服务
  • 分布式锁
  • Master 选举
  • 负载均衡
  • 分布式队列

## 1.3 zookeeper 安装与使用--含启动错误

### 1.3.1 zookeeper下载与安装

ZooKeeper 下载: https://zookeeper.apache.org/releases.html
选择稳定版本,点击下载并上传 apache-zookeeper-3.7.0-bin.tar.gz Linux
务器上。
ZooKeeper 的工作模式有三种 : 单机模式、集群模式、伪集群模式。
  • 单机模式:Zookeeper只运行在一台服务器上,适合测试环境;
  • 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例;
  • 集群模式:Zookeeper运行于一个至少有三个节点以上集群中,适合生产环境;
#### 1.3.1.1 集群模式环境及安装
IPServermyid
192.168.182.131Zookeeper1
192.168.182.153Zookeeper2
192.168.182.154Zookeeper3

 分别在三个虚拟机上安装Zookeeper,以下只展示一台主机步骤

①安装

  1. # 创建安装包目录
  2. mkdir /opt/zookeeper/
  3. # 解压上传的安装包
  4. tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz -C /opt/zookeeper/
  5. # 创建zookeeper存放目录及数据库目录,日志存放目录:
  6. mkdir -p /opt/zookeeper/apache-zookeeper-3.7.1-bin/data
  7. mkdir -p /opt/zookeeper/apache-zookeeper-3.7.1-bin/logs
  8. # 安装JDK
  9. rpm -ivh jdk-8u45-linux-x64.rpm
  10. echo -e 'export JAVA_HOME=export JAVA_HOME=/usr/local/java/jdk1.8.0_161
  11. export JRE_HOME=${JAVA_HOME}/jre
  12. export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
  13. export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
  14. export PATH=$PATH:${JAVA_PATH}' >> /etc/profile
  15. ## 使用source命令使配置文件修改即时生效,无需重启服务器
  16. source /etc/profile

②配置文件修改--注:存在空格会导致启动不成功,因此请将配置文件中多余空格删掉

  1. # 在192.168.182.131主机先修改配置文件
  2. ## 系统下载的zoo_sample.cfg配置文件的下划线不能识别,因此要为其重命名并修改
  3. cd /opt/zookeeper/apache-zookeeper-3.7.1-bin/conf
  4. cp zoo_sample.cfg zoo.cfg
  5. vim zoo.cfg
  6. ## 内容如下:
  7. ### 客户端与服务器或者服务器与服务器之间维持心跳,也就是每个tickTime时间就会发送一次心跳
  8. tickTime=2000
  9. ### 集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)
  10. initLimit=10
  11. ### 集群中flower服务器(F)跟leader(L)服务器之间的请求和答应最多能容忍的心跳数
  12. syncLimit=5
  13. ### 该属性对应的目录是用来存放myid信息跟一些版本,跟服务器唯一的ID信息等
  14. dataDir=/opt/zookeeper/apache-zookeeper-3.7.1-bin/data
  15. ### 该实例对应日志目录
  16. dataLogDir=/opt/zookeeper/apache-zookeeper-3.7.1-bin/logs
  17. ### 客户端连接的接口,客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接收客户端的请求访问!这个端口默认是2181
  18. clientPort=2181
  19. server.1=192.168.182.131:2888:38888
  20. server.2=192.168.182.153:2888:38888
  21. server.3=192.168.182.154:2888:38888
  22. # 同步配置文件到其他两台节点:
  23. scp zoo.cfg root@192.168.182.153:/opt/zookeeper/zookeeper-3.4.14/conf/
  24. scp zoo.cfg root@192.168.182.154:/opt/zookeeper/zookeeper-3.4.14/conf/
  25. 注: zookeeper 集群,每个节点的配置文件都是一样的。所以直接同步过去,不需要做任何修改

③创建myid文件

  1. # 192.168.182.131
  2. echo 1 > /opt/zookeeper/apache-zookeeper-3.7.1-bin/data/myid
  3. # 192.168.182.153
  4. echo 2 > /opt/zookeeper/apache-zookeeper-3.7.1-bin/data/myid
  5. # 192.168.182.154
  6. echo 3 > /opt/zookeeper/apache-zookeeper-3.7.1-bin/data/myid

启动服务&查看节点状态

  1. cd /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/
  2. # 三台节点同时执行如下命令启动服务并查看是否正常
  3. ./zkServer.sh start
  4. # 查看状态
  5. ./zkServer.sh status
#### 1.3.1.2 启动出现问题

1、端口冲突问题

        在3.5.5版本及以上,Zookeeper 提供了一个内嵌的Jetty容器来运行 AdminServer,默认占用的是 8080端口,AdminServer 主要是来查看 Zookeeper 的一些状态,如果机器上有其他程序(比如:Tomcat)占用了 8080 端口,也会导致 Starting zookeeper … FAILED TO START 的问题。

  • 如果不需要 AdminServer ,可以直接禁用:打开 zoo.cfg 配置文件,直接添加以下语句即可
  1. # 禁用 AdminServer 服务
  2. admin.enableServer=false
  • 如果想使用 AdminServer , 那么可以直接在 zoo.cfg 配置文件中修改端口号即可,比如让其绑定 9000
  1. # admin port
  2. admin.serverPort=9000

2、"Zookeeper Invalid config, exiting abnormally"问题请仔细检查配置文件,字母和空格都不要放过

3、添加多台服务岂会涉及选举问题,启动时先启动master,再启动其他follower,请不要用以下方法同时启动,本人已替大家试错且检查了1个小时问题

启动成功展示:

⑤输出jps查看进程

⑥查看zookeeper服务输出日志信息

⑦启动连接实例--会显示连接信息--三个都连一下,后面会获取信息

# 所有实例全部启动过后,选择任一实例进行连接,命令行输入如下命令:
  1. cd /opt/zookeeper/apache-zookeeper-3.7.1-bin
  2. ./bin/zkCli.sh -server 127.0.0.1:2181

 ⑧创建节点

        连接之后,可以在当前实例上创建节点,类似于创建一个 kv 值或者文件夹( ZK 的命
令和可选参数读者可以自行查看用户手册)
  1. # 在192.168.182.153服务器中创建节点create表示创建命令,/zk-demo为节点名称 123为节点值
  2. ## 相当于"/zk-demo"为目录(key),"123"为文件(value)
  3. [zk: 127.0.0.1:2181(CONNECTED) 1] create /zk-demo 123
  4. Created /zk-demo
  5. # 获取节点值 get表示获取 /zk-demo为需要获取的节点名称
  6. [zk: 127.0.0.1:2181(CONNECTED) 2] get /zk-demo
  7. 123

创建节点结果展示:

 ⑨在其他实例上获取 153服务器实例创建的节点

由于 zk 会将节点写入的值同步到集群中每个节点,从而保证数据的一致性,那
么其他节点理论上也可以访问到刚刚 153服务器 创建的值。

其他结果获取展示:

### 1.3.2 企业中的一些环境

  • 开发环境:开发调测,供开发人员测试
  • 测试环境:功能测试,压力测试等,供测试人员测试
  • 伪/类生产环境:部署后与开发,测试人员调节测试
  • 生产环境:对外提供服务--政企单位,银行系统等

### 1.3.3 Zookeeper 的数据结构和数据模型

#### 1.3.3.1 Znode 数据节点

整体结构类似于 linux 文件系统的模式以树形结构存储,其中根路径以 / 开头:

        如图,在根目录下创建 Dog 和 Cat 两个不同的数据节点,Cat 节点下有

TomCat 这个数据存储节点,整个 ZooKeeper 的树形存储结构就是这样的 Znode
成,并存储在内存中。
命令行下使用 ZooKeeper 客户端工具创建节点的过程如下:首先连接一个 zk
./bin/zkCli.sh -server 127.0.0.1:2181

创建节点:

  1. [zk: 127.0.0.1:2181(CONNECTED) 5] create /Dog
  2. Created /Dog
  3. [zk: 127.0.0.1:2181(CONNECTED) 6] create /Cat
  4. Created /Cat
  5. [zk: 127.0.0.1:2181(CONNECTED) 7] create /Cat/TomCat
  6. Created /Cat/TomCat

使用"ls"命令查看哥哥目录下的节点数据:

  1. [zk: 127.0.0.1:2181(CONNECTED) 8] ls /
  2. [Cat, Dog, zk-demo, zookeeper]
  3. [zk: 127.0.0.1:2181(CONNECTED) 10] ls /Cat
  4. [TomCat]
Znode 节点类似于 Unix 文件系统,但也有自己的特性:
1 Znode 兼具文件和目录特点 既像文件一样维护着数据、信息、时间戳等数
据,又像目录一样可以作为路径标识的一部分,并可以具有子 Znode 。用户对
Znode 具有增、删、改、查等操作;
2 Znode 具有原子性操作 读操作将获取与节点相关的所有数据,写操作也将替
换掉节点的所有数据;
3 Znode 存储数据大小有限制 每个 Znode 的数据大小至多 1M ,但是常规使用
中应该远小于此值;
4 Znode 通过路径引用 如同 Unix 中的文件路径。路径必须是绝对的,因此他们
必须由斜杠字符来开头。除此以外,他们必须是唯一的,也就是说每一个路径只有一
个表示,因此这些路径不能改变。
#### 1.3.3.2 Znode 节点类型
        Znode 有两种,分别为临时节点和永久节点,节点的类型在创建时即被确定,并且不能改变。
  • 临时节点:该节点的生命周期依赖于创建它们的会话。一旦会话结束,临时节点将被自动删除,当然可以也可以手动删除。临时节点不允许拥有子节点。
  • 永久节点:该节点的生命周期不依赖于会话,并且只有在客户端显式执行删除操作的时候,才能被删除。
        Znode 还有一个序列化的特性,如果创建的时候指定的话,该 Znode 的名字后面会 自动追加一个递增的序列号。序列号对于此节点的父节点来说是唯一的,这样便会记 录每个子节点创建的先后顺序。因此组合之后, Znode 有四种节点类型:
PERSISTENT :永久节点
EPHEMERAL :临时节点
PERSISTENT_SEQUENTIAL :永久顺序节点
EPHEMERAL_SEQUENTIAL :临时顺序节点
为了对节点类型有更清楚的认识,在命令行下来模拟创建一个临时节点:
1 )首先 连接 zk1-leader192.168.182.153 实例:
  1. # 连接zk1
  2. ./bin/zkCli.sh -server 127.0.0.1:2181
2 )创建一个临时节点:
  1. # -e 表示该节点为临时节点
  2. [zk: 127.0.0.1:2181(CONNECTED) 12] create -e /Dog/Puppy 123
  3. Created /Dog/Puppy
3 )连接 zk2 实例,查看该临时节点是否同步:
  1. # 连接zk2
  2. ./bin/zkCli.sh -server 127.0.0.1:2181
  3. # 查询/Dog/Puppy节点值
  4. [zk: 127.0.0.1:2182(CONNECTED) 2] get /Dog/Puppy
  5. 123
4 )断开 zk1 实例的会话:
  1. [zk: 127.0.0.1:2181(CONNECTED) 16] quit
  2. WATCHER::
  3. WatchedEvent state:Closed type:None path:null
  4. 2022-03-15 15:39:55,807 [myid:] - INFO [main:ZooKeeper@1232] -
  5. Session: 0x1000c3279ae0000 closed
  6. 2022-03-15 15:39:55,807 [myid:] - INFO [main-EventThread:ClientCnxn$EventThread@570] - EventThread shut down
  7. for session: 0x1000c3279ae0000
  8. 2022-03-15 15:39:55,810 [myid:] - ERROR [main:ServiceUtils@42] -
  9. Exiting JVM with code 0
5 )在 zk2 上查看该节点:
  1. [zk: 127.0.0.1:2182(CONNECTED) 3] get /Dog/Puppy
  2. org.apache.zookeeper.KeeperException$NoNodeException:
  3. KeeperErrorCode = NoNode for /Dog/Puppy
        可以看到/Dog/Puppy 临时节点随着 zk1 实例会话的退出消失了,这就是临时节点的
特性, zk1 创建的临时节点会随着 zk1 实例连接的退出而消失,永久节点则只能通过
delete /Dog( 节点名 ) 删除才会消失。

### 1.3.4 Zookeeper 一致性协议原理

        上图为 ZooKeeper 的整体架构。 ZooKeeper Service 是服务端集群,也是整个组件
的核心,客户端的读写请求都是它来处理。 ZK 下载安装章节模拟的 zk1/zk2/zk3
可以认为是一个 ZK 服务端集群,在 zk2 中写入的节点值,在 zk1 zk3 实例中
也能读到这个节点值, zk2 会话退出后临时节点在其他服务器上也同样消失了。
#### 1.3.4.1 Zookeeper 服务端角色

  • Leader 一个 ZooKeeper 集群同一时间只会有一个实际工作的 Leader,它会发起并维护与各 Follwer Observer 间的心跳。所有的写操作必须要通过Leader 完成再由 Leader 将写操作广播给其它服务器。
  • Follower 一个 ZooKeeper 集群可能同时存在多个 Follower,它会响应 Leader的心跳。Follower 可直接处理并返回客户端的读请求,同时会将写请求转发给Leader 处理,并且负责在 Leader 处理写请求时对请求进行投票。
  • Observer 角色与 Follower 类似,但是无投票权。
1 、早期的 ZooKeeper 集群服务运行过程中,只有 Leader 服务器和 Follow 服务器
2 、随着集群规模扩大, follower 变多, ZK 在创建节点和选主等事务性请求时,需要
一半以上节点 AC ,所以导致性能下降写入操作越来越耗时, follower 之间通信越来越
耗时
3 、为了解决这个问题,就引入了观察者,可以处理读,但是不参与投票。既保证了
集群的扩展性,又避免过多服务器参与投票导致的集群处理请求能力下降
#### 1.3.4.2 一致性协议 -- ZAB

        ZAB协议目的:保证集群中各个节点读写数据的一致性和可用性。

        这种协议支持崩溃恢复,并基于主从模式,同一时刻只有一个 Leader ,所 有的写操作都由 Leader 节点主导完成,而读操作可通过任意节点完成,因此 ZooKeeper 读性能远好于写性能,更适合读多写少的场景,有效解决了 zookeeper 集群 崩溃恢复 ,以及 主从同步数据 的问题 一旦 Leader 节点宕机, ZAB 协议的崩溃恢复机制能自动从 Follower 节点中重新选 出一个合适的替代者,即新的 Leader ,该过程即为领导选举。领导选举过程,是 ZAB 协议中最为重要和复杂的过程。
#### 1.3.4.3 Zookeeper Leader -- 领导选举算法

领导选举算法指:基于 TCP FastLeaderElection,其他选举算法被废弃。集群模式下 zoo.cfg 配置文件中有参数可配选举算法:

#### 1.3.4.4 FastLeaderElection 选举参数解析
         选举算法参数myid :每个 ZooKeeper 服务器,都需要在数据文件夹下创建一 个名为 myid 的文件,该文件包含整个 ZooKeeper 集群唯一的 ID (整数)。例如, 我们部署的 zk1/zk2/zk3 三个实例,其 myid 分别为 1 2 3 ,在配置文件中其 ID hostname 必须一一对应,如下所示。在该配置文件中, server. 后面的 id 即为 myid 。该参数在选举时如果无法通过其他判断条件选择 Leader ,那么将该 ID 的大 小来确定优先级。
  1. # 修改主机名
  2. hostnamectl set-hostname 新主机名
  3. ## 示例:我起的名是"1"
  4. hostnamectl set-hostname 1
  5. # 重启
  6. reboot
  7. # 在其他主机清单里备注
  8. vim /etc/hosts
  9. 192.168.182.131 1
  10. 192.168.182.153 2
  11. 192.168.182.154 3
        zxid:用于标识一次更新操作的 ID 。为了保证顺序性,该 zxid 必须单调递增,因此 ZooKeeper 使用一个 64 位的数来表示,高 32 位是 Leader epoch ,从 1 开始, 每次选出新的 Leader epoch 加一。低 32 位为该 epoch 内的序号,每次有写操作 32 位加一,每次 epoch 变化,都将低 32 位的序号重置。这样保证了 zxid 的全 局递增性。之前看到过有博主使用中国古代的年号来解释这个字段,非常形象:万历 十五年,万历是 epoch ,十五年是序号 选票数据结构 ,每个服务器在进行选举时, 发送的选票包含如下关键信息:
struct Vote {
        logicClock          // 逻辑时钟,表示该服务器发起的第多少轮投票
        state         // 当前服务器的状态 ( LOOKING- 不确定 Leader 状态
FOLLOWING- 跟随者状态 LEADING- 领导者状态 OBSERVING- 观察者状态)
        self_myid         // 当前服务器的 myid
        self_zxid         // 当前服务器上所保存的数据的最大 zxid
        vote_myid          // 被推举的服务器的 myid
        vote_zxid         // 被推举的服务器上所保存的数据的最大 zxid
}
节点服务器状态 ,每个服务器所处的状态时下面状态中的一种:
  • LOOKING 不确定 Leader 状态。该状态下的服务器认为当前集群中没有Leader,会发起 Leader 选举。
  • FOLLOWING 跟随者状态。表明当前服务器角色是 Follower,并且它知道Leader 是谁。
  • LEADING 领导者状态。表明当前服务器角色是 Leader,它会维护与 Follower间的心跳。
  • OBSERVING 观察者状态。表明当前服务器角色是 Observer,与 Folower 唯一的不同在于不参与选举,也不参与集群写操作时的投票。
选举投票流程:
注: 即使选票超过半数了,选出 Leader 服务实例了,也不是 立刻结束,而是等待 200ms ,确保没有丢失其他服务的更优的选票。
#### 1.3.4.5 崩溃恢复
假如 Zookeeper 当前的主节点挂掉了,集群会进行崩溃恢复。 ZAB 的崩溃恢复分成三 个阶段:
1.Leader election:选举阶段
        此时集群中的节点处于Looking 状态。它们会各自向其他节点发起投票,投票当中包 含自己的服务器 ID 和最新事务 ID ZXID );接下来,节点会用自身的 ZXID 和从其他节点接收到的 ZXID 做比较,如果发现别 ZXID 比自己大,也就是数据比自己新,那么就重新发起投票,投票给目前已知最 大的 ZXID 所属节点;
        每次投票后,服务器都会统计投票数量,判断是否有某个节点得到半数以上的投票。 如果存在这样的节点,该节点将会成为准 Leader ,状态变为 Leading 。其他节点的状态变为 Following
2.Discovery:发现阶段
        用于在从节点中发现最新的ZXID 和事务日志。或许有人会问:既然 Leader 被选为主 节点,已经是集群里数据最新的了,为什么还要从节点中寻找最新事务呢?这是为了 防止某些意外情况,比如因网络原因在上一阶段产生多个 Leader 的情况。所以这一 阶段, Leader 集思广益,接收所有 Follower 发来各自的最新 epoch 值。 Leader 从中 选出最大的 epoch ,基于此值加 1 ,生成新的 epoch 分发给各个 Follower 。各个 Follower 收到全新的 epoch 后,返回 ACK Leader ,带上各自最大的 ZXID 和历史事 务日志。 Leader 选出最大的 ZXID ,并更新自身历史日志。
3.Synchronization :同步阶段
        把Leader 刚才收集得到的最新历史事务日志,同步给集群中所有的 Follower 。只有 当半数 Follower 同步成功,这个准 Leader 才能成为正式的 Leader

## 1.4 消息队列

        即Message Queue ,我们从数据结构来理解的话, Queue 是一种先进先出的数据结 构。所以意思就是将信息(通常指传输的数据)放入一个队列中。消息队列也通常称 为消息中间件。

### 1.4.1 消息队列适合场景

        消息队列:它主要用来暂存生产者生产的消息,供后续其他消费者来消费。它的功能 主要有两个: a. 暂存 ( 存储 ) b. 队列 ( 有序:先进先出 ) 。其他大部分场景对数据的消 费没有顺序要求,主要用它的暂存能力 。从目前互联网应用中使用消息队列的场景 来看,主要有以下三个:
1. 异步处理数据 :传递者有固定消息队列放置信息,接受者可在不同时间获取信息。
2. 系统应用解耦 :上游生产消息模块先将数据放入消息队列,其他服务从消息队列获取数据,在后续要扩展其他系统使用时,只需通过新的消费者接入消息队列,上游模块不需做任何改动。
        软件设计的原则就是:高内聚,低耦合。 在没有引入消息队列模 块时,应用模块之间通常都是直接进行消息传递过程的。我们以 12306 购票过程为 例,用户购票下单的过程与车票库存直接相关。当用户下单购票时,会同时调用车票 库存模块,使车票余量减少。这样就使得下单模块与车票库存模块存在强耦合关系。 当车票库存模块服务不稳定时,例如,刚好要下单时库存模块就暂停服务了,这样一 来就可能直接引发下单失败等一系列不良体验。而若加入了消息队列,下单请求将会 直接发送到消息队列中暂存,待到车票库存模块恢复工作后再主动去消费下单请求。
3. 业务流量削峰:高峰期的数据先传入消息队列暂存, 供下游系统根据自己的消费能力来逐步处理。 同时这类消息往往对时延的要求不是很高,比较适合采用消息队列暂存。这样 可以保证服务器在任何情况下都能正常服务,只将这种流量爆增的代价转 架到客户端头上使其多排排队。

小结:

        消息队列主要适用于处理对消息要求不是很实时,同时一份数据可能会多处使用的场景,不同 的使用方处理速率不同。

### 1.4.2 消息队列主流产品

ActiveMQ ActiveMQ Apache 软件基金会基于 Java 语言开发的一个开源的消息代理。能够支持多个客户机或服务器。计算机集群等属性支持 ActiveMQ 来管理通 信系统。 RabbitMQ RabbitMQ 是实现了高级消息队列协议( AMQP )的开源消息代理软 件(亦称面向消息的中间件)。 RabbitMQ 服务器是用 Erlang 语言编写的,而集群 和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通 讯的客户端库。 RabbitMQ 支持多种消息传递协议、传递确认等特性。
Kafka Apache Kafka 是由 Apache 软件基金会开发的一个开源消息系统项目,由Scala 写成。 Kafka 最初是由 LinkedIn 开发,并于 2011 年初开源。 2012 10 月从 Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、 低等待的平台。 Kafka 是一个分布式的、分区的、多复本的日志提交服务。它通过一 种独一无二的设计提供了一个消息系统的功能。
RocketMQ Apache RocketMQ 是一个分布式消息和流媒体平台,具有低延迟、强一致 、高性能和可靠性、万亿级容量和灵活的可扩展性。它有借鉴 Kafka 的设计 思想,但不是 kafka 的拷贝。
Pulsar Apache Pulsar Apache 软件基金会顶级项目,是下一代云原生分布式 消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设 计,支持多租户、持久化存储、多机房跨区域数据复制,具有 强一致性 、高吞吐、低 延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和 计算最佳解决方案。

对比一览表:

        上图中,ActiveMQ 和 RabbitMQ 二者属于同一量级,在吞吐量上比后面三者差一个量级; 其次,支持强一致性的有 RocketMQ Pulsar ,在对消息一致性要求比较高的场景 可以采用这些产品。同时 kafka 虽然会有数据丢失的风险,但其吞吐量比较高同时社 区非常活跃,在大数据的绝大部分场景里,都可以采用 kafka 最后 kafka RocketMQ Pulsar 这几款是基于磁盘存储数据的,内存加速访问。而 ActiveMQ RabbitMQ 采用内存存储数据,也支持数据持久化到磁盘。

核心模型:

        生产者通常会有多个,消息队列集群内部也会有多个分区队列,所以在生产者发送数据时,通常会存在负载均衡的一些策略,常见的有 key hash 轮询 随机 等方式。
        其本质是一条数据,被消 息队列封装后也被称为一条消息,该条消息只能发送到其消息队列集群内部的一个分 区队列中。因此只需按照一定的策略从多个队列中选择一个队列即可
        消息队列集群: 消息队列集群是消息队列这种组件实现中的核心中的核心,它的主 要功能是 存储消息 过滤消息 分发消息
        存储消息主要指生产者生产的数据需要存储到消息队列内部;存储消息可以说是消息队列的核心,一个消息队列吞吐量的高低、性能优劣都和它的存储模型脱不开关 系。
        过滤消息只指消息队列可以通过一定的规则或者策略进行消息的过滤,该项能力通常也被称为消息路由;过滤消息属于高阶的特性功能, AMQP 协议对这些能力抽象的 比较完备,部分消息队列可以选择性的实现该协议来达到该功能。 分发消息是指消息队列通常需要将消息分发给处理同一逻辑的多个消费者处理或者处 理不同逻辑的不同消费者处理。
        分发消息可以说和消费者模型想挂钩,这块会涉及到不同的数据获取方式,也会涉及到消费者消费消息的模型。 此外绝大部分的消息队列也都支持对消息进行分类,分类的标签称为 topic( 主题 ) 一个 topic 中存放的是同一类消息。
        消费者: 最终消息队列存储的消息会被消费者消费使用,消费者也可以看做消息队列中数据的输出方。消费者通常有两种方式从消息队列中获取数据: 推送 (push) 拉取 (pull) 数据 。其次消费者也经常是作为客户端的角色出现在在消息队列这种 组件中。

### 1.4.3 获取数据的推、拉两种方案对比

消费者在从消息队列中获取数据时,主要有两种方案:
1. 等待推送数据
2. 主动拉取数据
目前的消息队列实现时,都会选择支持两种的至少一种。下图为两种方案对比:

### 1.4.4 消息队列消费模型

        消费者模型其实是一个 1:N:M 的关系,一份数据被 N 个消费者组独立使用,
每个消费者组中有 M 个消费者进行分摊消费。
        这种模型也称为发布订阅模型 ,对于一条消息而言,组间广播、组内单播。一条
消息只能被一个消费者组中的一个消费者使用。在消费者组内部也存在一些负载均衡
的策略。常用的有: 轮询 随机 hash 一致性 hash 等方案。

## 2.1 Kafka 基础知识及环境搭建

### 2.1.1 Kafka 介绍

        Kafka是由 LinkedIn 团队开发的一款 发布 / 订阅 模型的 分布式流处理平台 ,主要用于 对数据流的存储与处理。 Kafka 既然被当作消 息队列来使用,那么它的核心功能自然就是高性能的消息发送与消费了。同时 Kafka 还是一个依赖于 Zookeeper 的分布式消息队列。
Kafka 是一个分布式流媒体平台,流媒体平台有三个关键功能:
  • 发布和订阅记录流,类似于消息队列或企业消息传递系统。
  • 以容错持久的方式存储记录流。
  • 处理记录发生的流。
Kafka 通常用于两大类应用:
  • 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
  • 构建实时流应用程序,用于转换或响应数据流
kafka 四个核心 API
  • 应用程序使用producer API发布消息到1个或多个topic中。
  • 应用程序使用consumer API来订阅一个或多个topic,并处理产生的消息。
  • 应用程序使用streams API充当一个流处理器,1个或多个topic消费输入流,产生一个输出流到1个或多个topic,有效地将输入流转换到输出流。
  • connector API允许构建或运行可重复使用的生产者或消费者,topic链接到现有的应用程序或数据系统。
Kafka 的特性 :
  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition,consumer group partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写
Kafka 的使用场景:
  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoopHbaseSolr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafkatopic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streamingstorm
  • 事件源
kafka 拓扑
kafka 是依赖 zookeeper 集群的,一般最少也要三台服务器来实现 HA 。分三层:
  1. Producers:消息生产者,push消息给Brokers.发送时根据不同topic选择不同分区(在Broker上)。
  2. Brokers:注册在zookeeper节点上。
  3. Consumers:消息消费者,从brokers上根据订阅的topic选择不同分区,poll 数据,执行消费。

### 2.1.2 Kafka 消息引擎模型

Kafka 是一款开源的消息引擎系统。

最常见的两种消息引擎模型是点对点模型发布/订阅模型
        点对点模型是基于队列提供消息传输服务的,该模型定义了消息队列、发送者和接收 , 提供了一种点对点的消息传递方式,即发送者发送每条消息到队列的指定位置, 接收者从指定位置获取消息,一旦消息被消费, 就会从队列中移除该消息 。 每条消 息由一个发送者生产出来, 且只被一个消费者处理一一发送者和消费者之间是一对 一的关系。
        发布/订阅模型与前一种模型不同, 它有主题(topic) 的概念。 这种模型也定义了类 似于生产者/消费者这样的角色,即发布者和订阅者,发布者将消息生产出来发送到 指定的 topic 中, 所有订阅了该 topic 的订阅者都可以接收到该 topic 下的所有消息, 通常具有相同订阅 topic 的所有订阅者将接收到同样的消息。

基本概念:

消息
既然 Kafka 是消息引擎,这里的消息就是指 Kafka 处理的主要对象
Broker
broker 指一个 kafka 服务器。如果多个 broker 形成集群会依靠 Zookeeper 集群进行服务的协调管理。  生产者发送消息给 Kafka 服务器。消费者从 Kafka 服务器读取消息。
Topic Partition
topic 代表了一类消息, 也可以认为是消息被发送到的地方。 通常我们可以使用topic 来区分实际业务, 比如业务 A 使用 一个 topic , 业务 B 使用另外一个 topic Kafka 中的 topic 通常都会被多个消费者订阅, 因此出于性能的考量 , Kafka 并不 topic-message 的两级结构, 而是采用了 topic-partition-message 的三级结构 来分散负载。 从本质上说,每个 Kafka topic 都由若干个 partition 组成。
如图: topic 是由多个 partition 组成的, 而 Kafka partition 是不可修改的有序消息序列, 也可以说是 有序的消息日志。 每个 partition 有自己专属的 partition 号, 通常是从 0 开始的。 用户对 partition 唯一能做的操作就是 在消息序列的尾部 追加写入消息。 partition 上的每条消息都会被分配一个唯一的序列号 该序列号被称为位移( offset )是从 0 开始顺序递增 的整数。 位移信息可以唯一 定位到某 partition 下的一条消息 。

### 2.1.3 kafka 设计分区原因

         解决伸缩性的问题。假如一个broker积累了太多的数据以至于单台 Broker 机器都无法容纳了,此时应该怎么办呢?一个很自然的想法就是,能否把数据分割成多份保存在不同的 Broker 上?所以kafka设计了分区。

生产者和消费者
向主题发布消息的客户端应用程序称为生产者( Producer ),生产者程序通常持续不断地 向一个或多个主题发送消息,而订阅这些主题消息的客户端应用程序就被称 为消费者 ( Consumer )。和生产者类似,消费者也能够同时订阅多个主题的消息
消费者组
Consumer Group 是指组里面有多个消费者或消费者实例,它 们共享一个公共的ID ,这个 ID 被称为 Group ID 。组内的 所有消费者协调在一起来消费订阅主题的所 有分区( Partition )。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。

 Consumer Group 三个特性。」

1. Consumer Group 下可以有一个或多个 Consumer 实例。
2. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group
3. Consumer Group 下所有实例订阅的主题的单个分区, 只能分配给组内的某个 Consumer 实例消费。这个分区 当然也可以被其他的 Group 消费。
        Kafka 仅仅使用 Consumer Group 这一种机制,却同时实现了传统消息引 擎系统的 两大模型:如果所有实例都属于同一个 Group , 那么它实现的就是点对点模型;如 果所有实例分别属于不 同的 Group ,那么它实现的就是发布 / 订阅模型。

### 2.1.4 kafka 使用场景

日志收集:一个公司可以用 Kafka 收集各种服务的 log ,通过 kafka 以统一接口服务的方式开放给各种 consumer ,例如 hadoop Hbase Solr 等。
消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪 Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka topic 中,然后订 阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop 、数据仓库中做离 线分析和挖掘。
运营指标 Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

 「在实际使用场景中,一个 Group 下该有多少个 Consumer 实例呢?」

        理想情况下,Consumer 实例的数量应该等于该 Group 订阅主题的分区 总数。举个简单的例子,假设一个 Consumer Group 订阅了 3 个 主题,分别是 ABC,它们的分区数依次是 123, 那么通常情况下,为该 Group 设置 6 Consumer 实例是 比较理想的情形,因为它能最大限度地实现高伸缩性。 

### 2.1.5 kafka 速度快的理由

「顺序读写」
        kafka的消息是不断追加到文件中的,这个特性使 kafka 可以充分利用磁盘的顺序读写性能 。 顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写。
「零拷贝」
        服务器先将文件复制到内核空间,再复制到用户空间,最后再复制到内核空间并通过网卡发送出去,而零拷贝则是直接从内核到内核再到网卡,省去了用户空间的复制Zero copy 对应的是 Linux sendfile 函数,这个函数会接受一个 offsize 来确定从哪里 开始读取。现实中,不可能将整个文件全部发给消费者,他通过消费者传递过来的偏 移量来使用零拷贝读取指定内容的数据返回给消费者 。
「分区」
        kafka中的 topic 中的内容可以被分为多分 partition 存在 , 每个 partition 又分为多个段segment, 所以每次操作都是针对一小部分做操作,很轻便,并且增加 并行操作 的能 力 。
「批量发送」
        kafka允许进行批量发送消息, producter 发送消息的时候,可以将消息缓存在本 地,等到了固定条件发送到 kafka
        1. 等消息条数到固定条数
        2. 一段时间发送一次
「数据压缩」
        Kafka还支持对消息集合进行压缩, Producer 可以通过 GZIP Snappy 格式对消息集合进行压缩。 压缩的好处就是减少传输的数据量,减轻对网络传输的压力。 Producer 压缩之后,在 Consumer 需进行解压,虽然增加了 CPU 的工作,但在对大 数据处理上,瓶颈在网络上而不是 CPU ,所以这个成本很值得。

### 2.1.6 kafka 集群搭建

实验环境:(搭建 elk 平台的 kafka 集群,本节部署 kafka
IPServerbroker.id
192.168.182.131Kafka+ZooKeeper1
192.168.182.153Kafka+ZooKeeper2
192.168.182.154Kafka+ZooKeeper3
1. 在三个节点上解压程序:
  1. # 先创建解压目录
  2. mkdir /opt/kafka
  3. # 再跟,在根目录上传压缩包
  4. cd /root/
  5. tar xf kafka_2.11-2.2.1.tgz -C /opt/kafka/
  6. 创建数据目录
  7. mkdir /opt/kafka/data

2.在192.168.182.131节点上修改配置文件并同步到其他两个主机上:

# 查看配置文件内容

grep "^[a-z]" /opt/kafka/kafka_2.11-2.2.1/config/server.properties
  1. vim /opt/kafka/kafka_2.11-2.2.1/config /server.properties
  2. ## 内容如下:
  3. # 唯一标识节点,不能相同,不然会节点冲突。
  4. broker.id=1
  5. # 本机的kafka ip
  6. listeners=PLAINTEXT://192.168.182.131:9092
  7. # broker 处理消息的最大线程数
  8. num.network.threads=3
  9. # broker处理磁盘IO 的线程数
  10. num.io.threads=8
  11. # socket的发送缓冲区,socket的调优参数
  12. socket.send.buffer.bytes=102400
  13. SO_SNDBUFF
  14. # socket的接受缓冲区,socket的调优参数SO_RCVBUFF
  15. socket.receive.buffer.bytes=102400
  16. # socket请求的最大数值,防止 serverOOM
  17. socket.request.max.bytes=104857600
  18. # 日志路径
  19. log.dirs /opt/kafka/kafka_2.11-2.2.1/kafka-logs
  20. # kafka数据的存放地址,多个地址的话用逗号分割
  21. num.partitions=3
  22. # 用于在启动时,用于日志恢复的线程个数,默认是1
  23. num.recovery.threads.per.data.dir=1
  24. # 副本数或备份因子
  25. offsets.topic.replication.factor=1
  26. # 事务主题的复制因子(设置更高以确保可用性)
  27. transaction.state.log.replication.factor=1
  28. # 覆盖事务主题的min.insync.replicas配置
  29. transaction.state.log.min.isr=1
  30. # 日志保存的小时数
  31. log.retention.hours=60
  32. # 日志分配的字节数
  33. log.segment.bytes=1073741824
  34. # 文件大小检查的周期时间
  35. log.retention.check.interval.ms=300000
  36. # zookeeper集群的地址
  37. zookeeper.connect=192.168.182.131:2181,192.168.182.153:2181,192.168.182.154:2181
  38. # ZooKeeper的连接超时时间
  39. zookeeper.connection.timeout.ms=6000
  40. # 让coordinator推迟空消费组接收到成员加入请求后立即开启的rebalance
  41. group.initial.rebalance.delay.ms=0

# 使用scp命令在192.168.182.131主机将配置文件推到另外两台主机

scp /opt/kafka/kafka_2.11-2.2.1/config/ server.properties root@192.168.182.153:/opt/kafka/kafka_2.11-2.2.1/config/
scp /opt/kafka/kafka_2.11-2.2.1/config/ server.properties root@192.168.182.154:/opt/kafka/kafka_2.11-2.2.1/config/

3.其他两个节点修改broker.id及监听IP

#192.168.182.153
broker.id=2
listeners=PLAINTEXT://192.168.182.153:9092
#192.168.182.154
broker.id=3
listeners=PLAINTEXT://192.168.182.154:9092

 4.启动服务

./bin/kafka-server-start.sh -daemon config/server.properties
# 其他两台节点启 动方式相同
# 或nohup命令后台启动--最好用这个
nohup bin/kafka-server-start.sh config/server.properties &

注意:

Kafka 需要用到 ZooKeepr 所以需要先启动一个 ZooKeepr 服务端,如果没有单独的ZooKeeper 服务端可以使用 Kafka 自带的脚本快速启动一个单节点 ZooKeepr 实例
# 启动 zookeeper 服务端实例
cd /opt/zookeeper/apache-zookeeper-3.7.1-bin/
./ bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 kafka 服务端实例
cd /opt/kafka/kafka_2.11-2.2.1/
nohup ./bin/kafka-server-start.sh config/server.properties &

 验证操作:

1 、新建一个主题 topic: kafka-topics.sh
创建一个只有一个分区和一个备份名为 “test” Topic
# ./bin/kafka-topics.sh --create --zookeeper 192.168.182.131:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test" .

创建结果展示:

2 、查看主题:
运行 list topic 命令,查看该主题:
 ./bin/kafka-topics.sh --list --zookee per 192.168.182.131:2181

查看结果:

同时也可以在 zookeeper 中查看:
cd /opt/zookeeper/apache-zookeeper-3.7.1-bin/
./bin/zkCli.sh -serv er 192.168.182.131:2181

查看结果:

        Kafka带有一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息 发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。
3 、运行 producer 生产者 , 然后在控制台输入几条消息到服务器--192.168.182.131上运行
  1. cd /opt/kafka/kafka_2.11-2.2.1/
  2. ./bin/kafka-console-producer.sh --broker-list 192.168.182.131:9092 --topic test
  3. # 内容:
  4. >111
  5. >this is a test
  6. >this is another message
  7. >hello
输入信息展示:
4 、消费消息: kafka-console-consumer.sh--192.168.182.153上运行
        Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来在另一台机器上或者不同终端上执行此命令,这样就能将消息键入生产者终端,并 将它们显示在消费者终端。
  1. cd /opt/kafka/kafka_2.11-2.2.1
  2. ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.182.131:9092 --topic test --from-beginning
消费者查看信息展示(动态同步):
注:接收到的消息可能会有延迟比较慢
5 、查看 topic详细信息--192.168.182.153上运行
./bin/kafka-topics.sh --describe --zookeeper 192.168.182.131:2181 --topic test
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
## 内容如下:
Topic:test PartitionCount:1 ReplicationFactor:1
Configs:
        Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: 主题名称
Partition: 分片编号
Leader: 该分区的 leader 节点
Replicas: 该副本存在于哪个 broker 节点
Isr: 活跃状态的 broker
topic信息展示:

## 3.1 ELK+Kafka+Filebeat 集群架构

日志分析平台搭建原因:

        随着业务量的增长,每天业务服务器将会产生上亿条的日志,单个日志文件达几个GB,这时我们发现用 Linux 自带工具, cat grep awk 分析越来越不满足正常业务进行,而且除 了服务器日志,还有程序报错日志,分布在不同的服务器,查阅繁琐。
待解决的痛点 :
1 、大量不同种类的日志成为了运维人员的负担,不方便管理 ;
2 、单个日志文件巨大,无法使用常用的文本工具分析,检索困难 ;
3 、日志分布在多台不同的服务器上,业务一旦出现故障,需要一台台查看日志。
为了解决以上困扰 :
在之前的搭建 elk 环境中,日志的处理流程为: filebeat --> logstash -->elasticsearch ,随着业务量的增长,需要对架构做进一步的扩展,引入 kafka 集群。
日志的处理流程变为: filebeat --> kafka --> logstash --> elasticsearch

日志分析平台搭建:

架构解读:(整个架构从左至右,总共分为 5 层)
第一层:数据采集层
最左边的是业务服务器集群,上边安装了 filebeat 做日志采集,同时把采集的日志分别发送给 logstash 服务。
第二层:数据处理层,数据缓存层
Logstash 服务把接受到的日志经过格式处理,转存到本地的 kafka+zookeeper 集群中。
第三层:数据转发层
这个单独的 logstash 节点会实时去 kafka+zookeeper 集群拉取数据,转发至 ES DataNode
第四层:数据持久化存储
ES DataNode 会把收到的数据,写磁盘,建索引库。
第五层:数据检索,数据展示
ES Master + kibana 主要协调 ES 集群,处理数据检索请求,数据展示。

### 3.1.1 环境部署

主机角色安装软件所属服务层
192.168.182.131日志采集Filebeat数据采集层
192.168.182.131日志缓存ZooKeeper+Kafka+Logstash数据处理层,数据缓存层
192.168.182.153日志采集和缓存及存储Filebeat+ES+ZooKeeper+Kafka数据持久化存储
192.168.182.154日志采集和存储Filebeat+ES+ZooKeeper+Kafka数据持久化存储
192.168.182.131日志检索和展示Es+Kibana数据检索,数据展示
前面已经把 zookeeper+kafka 集群已经搭建配置好了,接下来就是配置 整个平台的 ELK+filebeat 组件。
注:每台服务器都要有Filebeat,可以去Linux--filebeat 日志采集及实战练习--EFK练习-CSDN博客找filebeat下载及安装配置。
#### 3.1.1.1 配置 Filebeat 输出到 Kafka 集群
修改Filebeat配置文件,配置输出到 Kafka--三台服务器依次配置
  1. # 进入目录
  2. cd /opt/beats/filebeat-6.5.4-linux-x86_64
  3. # 编辑配置文件
  4. vim beats-log-nginx-access.yml
  5. ## 内容如下:
  6. filebeat.prospectors:
  7. - type: log
  8. enabled: true
  9. paths:
  10. - /var/log/nginx/access.log
  11. output.kafka:
  12. enabled: true
  13. hosts: ["192.168.182.131:9092", "192.168.182.153:9092","192.168.182.154:9092"]
  14. topic: 'nginx_access-log'
  15. # 启动
  16. ./filebeat -e -c beats-log-nginx-access.yml
  17. ## 若没报错请开启为后台启动
  18. ## -e:标准输出,禁用系统日志和文件输出
  19. ## -c:指定配置文件
  20. ### 以下含义:错误日志输出到标准输出,写入kafka.log日志
  21. nohup ./filebeat -e -c beats-log-nginx-access.yml > kafka.log 2>&1 &

启动结果:

        重启filebeat服务,并尝试向/var/log/nginx/access.log 文件中追加一些测试数据,在kafka 集群任 意节点查看主题,并消费这些数据。
  1. # Filebeat客户端模拟生成日志:
  2. echo "111" >> /var/log/nginx/access.log
  3. # 在kafka集群任意节点查看是否生成对应topic: --192.168.182.154
  4. cd /opt/kafka/kafka_2.11-2.2.1/
  5. ./bin/kafka-topics.sh --list --zookeeper localhost:2181

  1. # 尝试能否消费该主题下的数据 --192.168.182.153
  2. cd /opt/kafka/kafka_2.11-2.2.1/
  3. ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.182.154:9092 --topic nginx_access-log --from-beginning
  4. # 上条语句输入后在192.168.182.154输入以下命令,观察192.168.182.153变化
  5. echo "666" >> /var/log/nginx/access.log
  6. echo "888" >> /var/log/nginx/access.log
153动态变化展示:
# 如果不指定 codec ,默认为 json 格式
到此filebeat输出到kafka配置完成
#### 3.1.1.2 Logstash 从 Kafka 读取数据,并输出到 elasticsearch
先配置输出到标准输出:
  1. # 进入192.168.182.131的logstash配置目录
  2. cd /opt/logstash/logstash-6.5.4/config
  3. # 编辑配置文件
  4. vim kafka.conf
  5. ## 内容如下:
  6. input {
  7. kafka {
  8. codec => "json"
  9. bootstrap_servers => "192.168.182.131:9092,192.168.182.153:9092,192.168.182.154:9092"
  10. topics => "nginx_access-log"
  11. consumer_threads => 2
  12. }
  13. }
  14. output {
  15. stdout { codec => rubydebug }
  16. }

检查配置语法,没问题启动 Logstash:

  1. # 192.168.182.131中检查并启动
  2. cd /opt/logstash/logstash-6.5.4/
  3. # 检查语法 -t
  4. ./bin/logstash -f config/kafka.conf -t
  5. # 启动
  6. ./bin/logstash -f config/kafka.conf --config.reload.automatic
  7. # 开启后没错误就放置在此页面
同样的模式向日志文件中插入一条数据,查看 logstash 输出是否正常:
  1. # 192.168.182.154中,Filebeat 客户端模拟生成日志
  2. echo "999" >> /var/log/nginx/access.log

192.168.182.131中Logstash端输出结果:

到目前为止,整体流程已经走通,kafka集群成功的加入到elk平台中。
#### 3.1.1.3 将 Filebeat 采集的日志通过 Logstash 输出到 Elasticsearch 和 Kibana
修改配置文件后重新启动服务查看 head 插件是否可以查看到数据,正常如下:
  1. # 启动 elasticsearch 集群--三台主机都启动
  2. # 切换用户
  3. su - elsearch
  4. # 启动
  5. cd /opt/es/elasticsearch-6.5.4/
  6. ./bin/elasticsearch -d
  7. # 查看日志是否启动成功
  8. tail -f logs/es-clusters.log
  1. # 修改 Logstash 配置文件
  2. cd /opt/logstash/logstash-6.5.4/config
  3. vim kafka.conf
  4. ## 内容如下:
  5. input {
  6. kafka {
  7. codec => "json"
  8. bootstrap_servers => "192.168.182.131:9092,192.168.182.153:9092,192.168.182.154:9092"
  9. topics => "nginx_access-log"
  10. consumer_threads => 2
  11. }
  12. }
  13. output {
  14. elasticsearch {
  15. hosts => ["192.168.182.131:9200","192.168.182.153:9200","192.168.182.154:9200"]
  16. }
  17. }
  18. # 启动
  19. ./bin/logstash -f config/kafka.conf --config.reload.automatic
  1. # 192.168.182.154 filebeat 客户端模拟生成日志:
  2. 这里我们将/var/log/messages的倒数100行日志输出重定向到这个日志中
  3. tail -100 /var/log/messages >> /var/log/nginx/access.log
修改配置文件后重新启动 Logstash 服务查看浏览器中 elasticsearch 插件 是否可以查看到数据,正常如下:

#### 3.1.1.4 Kibana 操作--192.168.182.131

启动 Kibana

  1. cd /opt/kibana/kibana-6.5.4-linux-x86_64/
  2. 前台启动 ./kibana
  3. 或后台启动(建议):nohup ./bin/kibana &

若要停止 Kibana 可用以下命令:

  1. # 查看端口
  2. fuser -n tcp 5601
  3. kill -9 端口

重启 Logstash --192168.182.131

  1. cd /opt/logstash/logstash-6.5.4/config
  2. ./bin/logstash -f config/kafka.conf --config.reload.automatic
  3. # 也可以后台启动
  4. ## nohup ./bin/logstash -f config/kafka.conf --config.reload.automatic &

在浏览器中输入"http://192.168.182.131:5601",在“系统管理”中添加索引,并在“发现”中调好查看时间,查看 Kibana 最终的数据,能够从数据中得出最近成功登录nginx的数据

 完结撒花 ^-^

## 4.1 小结

本次实验,模拟测试访问 nginx 界面,通过 Filebeat 采集三个节点 nginx 的 access.log 日志,由 Logstash 读取传入 Kafka(使用大并发量场景) 中,通过 Kibana 渲染至页面中,完成对各主机的日志采集。

上图中,host.name 为2的是192.168.182.153主机,为3的是192.168.182.154主机,可见 Kibana 采集 nginx 成功登录数据成功!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/556905
推荐阅读
相关标签
  

闽ICP备14008679号