当前位置:   article > 正文

ELK+Filebeat+Kafka分布式日志管理平台搭建_elk+kafka+filebeat

elk+kafka+filebeat

目录

实验环境

一、Elasticsearch 安装部署

二、Logstash、kibana 安装

三、ELK + Filebeat 安装部署

安装 Filebeat

设置 filebeat 的主配置文件

在 Logstash 组件所在节点上新建一个 Logstash 配置文件

浏览器访问

四、 安装部署kafka

安装部署kafka之前需要先安装zookpeer

修改zookeeper配置文件

所有节点启动zook

查看集群状态

验证集群功能

配置zookeeper自启脚本【systemctl】

设置开机开机自启

五、 KAFKA集群安装

启动kafka

配置kafka自启动脚本【systemctl】

设置开机自启

环境变量

测试kafka

查看topic

发布消息

删除topic

六、Filebeat+Kafka+ELK架构部署

6.1 部署 Filebeat

6.2 部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

七、总结



 

实验环境

Node1节点(2C/4G)node1/192.168.130.128 Elasticsearch  
Node2节点(2C/4G)node2/192.168.130.220Elasticsearch Filebeat Apache
Apache节点apache/192.168.130.250    Logstash  Apache  kibana  filebeat
zookeeper  kafaka      192.168.130.129zookeeper kafaka
zookeeper kafaka        192.168.130.20zookeeper kafaka
zookeeper kafaka        192.168.130.150zookeeper kafaka 

 实验拓扑图

 

一、Elasticsearch 安装部署

(部署在 node1 node2 )

安装方法可见上篇博客

https://blog.csdn.net/weixin_62466637/article/details/123205876?spm=1001.2014.3001.5502

 

二、Logstash、kibana 安装

(部署在apache)

安装方法可见上篇博客

https://blog.csdn.net/weixin_62466637/article/details/123205876?spm=1001.2014.3001.5502

 

三、ELK + Filebeat 安装部署

安装 Filebeat

  1. #上传软件包 filebeat-6.2.4-linux-x86_64.tar.gz 到/opt目录
  2. tar zxvf filebeat-6.2.4-linux-x86_64.tar.gz
  3. mv filebeat-6.2.4-linux-x86_64/ /usr/local/filebeat

设置 filebeat 的主配置文件

  1. cd /usr/local/filebeat
  2. vim filebeat.yml
  3. filebeat.prospectors:
  4. - type: log #指定 log 类型,从日志文件中读取消息
  5. enabled: true
  6. paths:
  7. - /var/log/messages #指定监控的日志文件
  8. - /var/log/*.log
  9. fields: #可以使用 fields 配置选项设置一些参数字段添加到 output
  10. service_name: filebeat
  11. log_type: log
  12. service_id: 192.168.80.13
  13. --------------Elasticsearch output-------------------
  14. (全部注释掉)
  15. ----------------Logstash output---------------------
  16. output.logstash:
  17. hosts: ["192.168.80.12:5044"] #指定 logstash 的 IP 和端口
  18. #启动 filebeat
  19. ./filebeat -e -c filebeat.yml

 先收集var/log下的所有日志

 指定输出的ip和端口

 

在 Logstash 组件所在节点上新建一个 Logstash 配置文件

  1. cd /etc/logstash/conf.d
  2. vim logstash.conf
  3. input {
  4. beats {
  5. port => "5044"
  6. }
  7. }
  8. output {
  9. elasticsearch {
  10. hosts => ["192.168.80.10:9200"]
  11. index => "%{[fields][service_name]}-%{+YYYY.MM.dd}"
  12. }
  13. stdout {
  14. codec => rubydebug
  15. }
  16. }
  17. #启动 logstash
  18. logstash -f logstash.conf

 

 

浏览器访问

http://192.168.130.250:5601 登录 Kibana,单击“Create Index Pattern”按钮添加索引“filebeat-*”,单击 “create” 按钮创建,单击 “Discover” 按钮可查看图表信息及日志信息。

 

 

四、 安装部署kafka

官网地址:http://kafka.apache.org/downloads/
下载地址:http://wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz

zookpeer

安装部署kafka之前需要先安装zookpeer

  1. su - root
  2. cd /u-data
  3. mkdir -p /u-data/kafka-zookeeper/kafka/data
  4. mkdir -p /u-data/kafka-zookeeper/kafka/logs
  5. mkdir -p /u-data/kafka-zookeeper/zookeeper/data
  6. mkdir -p /u-data/kafka-zookeeper/zookeeper/logs
  1. tar zxvf kafka_2.13-2.7.1.tgz
  2. tar zxf apache-zookeeper-3.6.3-bin.tar.gz

 

 

修改zookeeper配置文件

  1. 修改zookeeper配置文件
  2. cd /u-data/apache-zookeeper-3.6.3-bin/conf/
  3. cp zoo_sample.cfg zoo.cfg
  4. [root@localhost u-data]# vim /u-data/apache-zookeeper-3.6.3-bin/conf/zoo.cfg
  5. #对于从节点最初连接到主节点时的超时时间,单位为tick值的倍数
  6. tickTime=2000
  7. initLimit=10
  8. #对于主节点与从节点进行同步操作时的超时时间,单位为tick值的倍数
  9. syncLimit=5
  10. #用于配置内存数据库保存的模糊快照的目录,目录需要单独创建
  11. dataDir=/u-data/kafka-zookeeper/zookeeper/data
  12. #用于存放日志信息,目录需要单独创建
  13. dataLogDir=/u-data/kafka-zookeeper/zookeeper/logs
  14. #客户端所连接的服务器所监听的端口号
  15. clientPort=2181
  16. ###定时清理zookeeper日志配置
  17. #清理频率单位是小时需要大于1的整数
  18. autopurge.purgeInterval=24
  19. ##日志保留文件数目
  20. autopurge.snapRetainCount=10
  1. ###集群信息
  2. server.0=20.0.0.21:3188:3288
  3. server.1=20.0.0.22:3188:3288
  4. server.2=20.0.0.23:3188:3288
  5. ###注释
  6. #server.1 #1表示zookeeper节点的id这个需要与myid对应必须一样
  7. #20.0.0.21 #zookeeper节点所在机器的ip地址
  8. #3188 #zookeeper节点非选举通讯端口
  9. #3288 #zookeeper节点间选举通讯端口

 

 

  注意:

集群模式下需要新增一个名叫myid的文件,这个文件放在上述dataDir指定的目录下,这个文件里面就只有一个数据,就是配置文件中server.x的这个x(1,2,3)值,zookeeper启动时会读取这个文件,拿到里面的数据与zoo.cfg 里面的配置信息比较从而判断到底是那个server(节点)。

  1. 20.0.0.21的操作
  2. [root@localhost u-data]# echo '0' > /u-data/kafka-zookeeper/zookeeper/data/myid
  3. 20.0.0.22的操作
  4. [root@localhost u-data]# echo '1' > /u-data/kafka-zookeeper/zookeeper/data/myid
  5. 20.0.0.23的操作
  6. [root@localhost u-data]# echo '2' > /u-data/kafka-zookeeper/zookeeper/data/myid

 

所有节点启动zook

这里的节点数量必须大于等于三(且为奇数)  不然无法选举(拜占庭故事)

  1. [root@localhost conf]# /u-data/apache-zookeeper-3.6.3-bin/bin/zkServer.sh start
  2. /usr/bin/java
  3. ZooKeeper JMX enabled by default
  4. Using config: /u-data/apache-zookeeper-3.6.3-bin/bin/../conf/zoo.cfg
  5. Starting zookeeper ... STARTED

查看集群状态

  1. /u-data/apache-zookeeper-3.6.3-bin/bin/zkServer.sh status
  2. Mode: leader 为master
  3. Mode: follower 为slave

 

 

 

验证集群功能

  1. /u-data/apache-zookeeper-3.6.3-bin/bin/zkCli.sh -server 20.0.0.21:2181
  2. 回车
  3. [zk: 20.0.0.21:2186(CONNECTED) 1] ls /
  4. [zookeeper]
  5. Ctrl+c退出

配置zookeeper自启脚本【systemctl】

  1. [root@localhost conf]# cd /etc/init.d
  2. [root@localhost system]# vim zookeeper
  3. #!/bin/bash
  4. #chkconfig:2345 20 90
  5. #description:zookeeper
  6. #processname:zookeeper
  7. #export JAVA_HOME=/usr/local/JAVA ##重新安装的java环境的话要填写
  8. case $1 in
  9. start) su root /u-data/apache-zookeeper-3.6.3-bin/bin/zkServer.sh start;;
  10. stop) su root /u-data/apache-zookeeper-3.6.3-bin/bin/zkServer.sh stop;;
  11. status) su root /u-data/apache-zookeeper-3.6.3-bin/bin/zkServer.sh status;;
  12. restart) su root /u-data/apache-zookeeper-3.6.3-bin/bin/zkServer.sh restart;;
  13. *) echo "require start|stop|status|restart" ;;
  14. esac
  15. [root@localhost init.d]# chmod +x zookeeper

设置开机开机自启

  1. chkconfig --add zookeeper
  2. chkconfig zookeeper on
  3. chkconfig --list
  4. zookeeper 0:off 1:off 2:on 3:on 4:on 5:on 6:off

五、 KAFKA集群安装

【关于kafka内外网访问】https://www.cnblogs.com/xuewenlong/p/14379007.html

  1. [root@localhost config]# vim server.properties
  2. 21 broker.id=21 ##每一个broker在集群中要求唯一,所以三个节点为212222

 EXTERNAL代表外网访问地址,INTERNAL代表内网访问地址,可以查看上面的博客

  1. 31 listeners=PLAINTEXT://20.0.0.21:9092 #节点的IP区分开来
  2. #配置监听者的安全协议,如果不是多网卡,切记以下两条省略
  3. 32 stener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
  4. 33 inter.broker.listener.name=INTERNAL
  5. 下面两个参数一个用于接收并处理网络请求的线程数,默认为3。其内部实现是采用Selector模型。启动一个线程作为Acceptor来负责建立连接,再配合启动num.network.threads个线程来轮流负责从Sockets里读取请求,一般无需改动,除非上下游并发请求量过大。一般num.network.threads主要处理网络io,读写缓冲区数据,基本没有io等待,配置线程数量为cpu核数加1.num.io.threads主要进行磁盘io操作,高峰期可能有些io等待,因此配置需要大些。配置线程数量为cpu核数2倍,最大不超过3倍.
  6. #broker处理消息的最大线程数,最好配置成CPU核数
  7. 44 num.network.threads=1
  8. #配置成CPU核数的2倍,一般不超过3
  9. 47 num.io.threads=2
  10. 125 zookeeper.connect=20.0.0.21:2181,20.0.0.22:2181,20.0.0.23:2181 ##zookeeper集群的地址
  11. 128 zookeeper.connection.timeout.ms=18000 ##ZooKeeper的连接超时时间
  12. #
  13. 136 group.initial.rebalance.delay.ms=0
  14. #kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能 #比如:/data/kafka-logs-1/data/kafka-logs-2
  15. 62 log.dirs=/u-data/kafka-zookeeper/kafka/logs

 

 

 以下为可选项

  1. #一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
  2. 64 background.threads =4
  3. #等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
  4. 65 queued.max.requests =500
  5. #socket的发送缓冲区,socket的调优参数SO_SNDBUFF
  6. 50 socket.send.buffer.bytes=102400
  7. #socket的接受缓冲区,socket的调优参数SO_RCVBUFF
  8. 53 socket.receive.buffer.bytes=8388608
  9. #socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
  10. 56 socket.request.max.bytes=104857600
  11. #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
  12. 69 num.partitions=18
  13. 73 num.recovery.threads.per.data.dir=1
  14. #是否允许自动创建topic,若是false,就需要通过命令创建topic
  15. 75 default.replication.factor=3
  16. #log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
  17. 95 log.flush.interval.messages=10000
  18. #仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
  19. 98 log.flush.interval.ms=1000
  20. #日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
  21. 100 log.cleanup.policy=delete
  22. #数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除
  23. 110 log.retention.hours=72
  24. #topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
  25. 117 log.segment.bytes=1073741824
  26. #文件大小检查的周期时间,是否使用log.cleanup.policy中设置的策略
  27. 121 log.retention.check.interval.ms=300000

 

 

启动kafka

  1. /u-data/kafka_2.13-2.7.1/bin/kafka-server-start.sh -daemon /u-data/kafka_2.13-2.7.1/config/server.properties
  2. [root@localhost bin]# ps -ef | grep kafka

 

 

配置kafka自启动脚本【systemctl】

  1. [root@localhost init.d]# pwd
  2. /etc/init.d
  3. [root@localhost init.d]# vim kafka
  4. #!/bin/bash
  5. #chkconfig:2345 60 20
  6. #description:kafka
  7. #export JAVA_HOME=/usr/java/jdk1.8.0_172 ##如果新安装java,申明java环境
  8. KAFKA_HOME=/u-data/kafka_2.13-2.7.1
  9. case $1 in
  10. start) su root ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties;;
  11. stop) su root ${KAFKA_HOME}/bin/kafka-server-stop.sh;;
  12. status)
  13. count=$(ps -ef |grep kafka |egrep -cv "grep|$$")
  14. if [ "$count" -eq 0 ];then
  15. ehco "kafka is not running"
  16. else
  17. echo "kafka is running"
  18. fi
  19. ;;
  20. restart)
  21. su root ${KAFKA_HOME}/bin/kafka-server-stop.sh
  22. su root ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
  23. ;;
  24. *) echo "require start|stop" ;;
  25. esac

设置开机自启

  1. chkconfig --add kafka
  2. chkconfig kafka on
  3. chkconfig --list
  4. kafka 0:off 1:off 2:on 3:on 4:on 5:on 6:off

环境变量

  1. [root@localhost bin]# vim /etc/profile
  2. export PATH=$PATH:/u-data/kafka_2.13-2.7.1/bin
  3. [root@localhost bin]# source /etc/profile

 

 

 

测试kafka

  1. ##创建topic
  2. [root@localhost bin]# kafka-topics.sh --create --zookeeper 20.0.0.21:2181,20.0.0.22:2181,20.0.0.23:2181 --replication-factor 1 --partitions 1 --topic test
  3. Created topic test

 

  1. --zookeeper:为zk服务器地址,已逗号分割配置多个
  2. --replication-factor:分区leader副本数,1代表没有副本即分区本身,建议为2
  3. --partitions:分区数
  4. --topic:topic名称

查看topic

  1. [root@localhost bin]# kafka-topics.sh --list --zookeeper 20.0.0.21:2181,20.0.0.22:2181,20.0.0.23:2181
  2. test
  3. 查看test topic消息
  4. [root@localhost bin]# kafka-topics.sh --describe --zookeeper 20.0.0.21:2181,20.0.0.22:2181,20.0.0.23:2181 --topic test
  5. Topic: test PartitionCount: 1 ReplicationFactor: 1 Configs:
  6. Topic: test Partition: 0 Leader: 21 Replicas: 21 Isr: 21

  1. #leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
  2. #Replicas:列出了所有的副本节点,不管节点是否在服务中.
  3. #Lsr:是正在服务中的节点.

发布消息

  1. kafka-console-producer.sh --broker-list 20.0.0.21:9092 --topic test
  2. 1
  3. 2
  4. 3
  5. 4

 

 如果使用了分区  它只保证各个分区里的顺序准确

严格保证消息的消费顺序(抢红包,商品秒杀)的场景下,都需要把partition数目设置为1

  1. ##消费消息,新开一个20.0.0.21
  2. kafka-console-consumer.sh --bootstrap-server 20.0.0.21:9092 --topic test --from-beginning
  3. 1
  4. 2
  5. 3
  6. 4

删除topic

  1. [root@localhost bin]# kafka-topics.sh --delete --zookeeper 20.0.0.21:2181,20.0.0.22:2181,20.0.0.23:2181 --topic test
  2. Topic test is marked for deletion.

六、Filebeat+Kafka+ELK架构部署

6.1 部署 Filebeat

  1. cd /usr/local/filebeat
  2. vim filebeat.yml
  3. filebeat.prospectors:
  4. paths:
  5. #- /var/log/*.log
  6. - /etc/httpd/logs/access_log
  7. tags: ["access"]
  8. - type: log
  9. enabled: true
  10. paths:
  11. - /etc/httpd/logs/error_log
  12. tags: ["error"]
  13. ......
  14. #添加输出到 Kafka 的配置
  15. output.kafka:
  16. enabled: true
  17. hosts: ["192.168.80.11:9092","192.168.80.12:9092","192.168.80.13:9092"] #指定 Kafka 集群配置
  18. topic: "apache" #指定 Kafka 的 topic
  19. #启动 filebeat
  20. ./filebeat -e -c filebeat.yml

6.2 部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

  1. cd /etc/logstash/conf.d/
  2. vim filebeat.conf
  3. input {
  4. kafka {
  5. bootstrap_servers => "192.168.80.11:9092,192.168.80.12:9092,192.168.80.13:9092"
  6. topics => "apache"
  7. type => "apache_kafka"
  8. codec => "json"
  9. auto_offset_reset => "latest"
  10. decorate_events => "true"
  11. }
  12. }
  13. output {
  14. if "access" in [tags] {
  15. elasticsearch {
  16. hosts => ["192.168.80.20:9200"]
  17. index => "apache_access-%{+YYYY.MM.dd}"
  18. }
  19. }
  20. if "error" in [tags] {
  21. elasticsearch {
  22. hosts => ["192.168.80.20:9200"]
  23. index => "apache_error-%{+YYYY.MM.dd}"
  24. }
  25. }
  26. stdout { codec => rubydebug }
  27. }
  28. #启动 logstash
  29. logstash -f filebeat.conf

 

七、总结

  1. 部署Kafka要先部署zookeeper再zk基础上部署Kafka,节点数量通常是>=3个节点,且为奇数
  2. zookeeper概念:分布式的系统管理框架,相当于文件系统+通知机制
  3. 本质:存储和管理服务的数据,如果服务状态发生变化会通知客户端
  4. 结合Kafka:生产者 push 数据到Kafka集群需要通过zk去寻找,消费者消费哪条数据也需要zk的支持,因为可以从zk获得offsset,offset(偏移量)记录上一次消费的数据到哪里了,这样就可以接着下一条数据继续进行消费。
  5. Kafka优点:发布/订阅模式(观察者模式),MQ消息队列,应用解耦可以实现异步处理,达到缓冲、流量削峰的效果
  6. Kafka 通过 zookeeper 来存储集群的 信息

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/541224
推荐阅读
相关标签
  

闽ICP备14008679号