当前位置:   article > 正文

实战--------部署搭建ELFK+zookeeper+kafka架构_felk搭建

felk搭建

目录

一、部署jdk环境

二、搭建Elasticsearch

三、搭建logstash

四、搭建kibana服务

五、搭建filebeat服务

六、搭建zookeeper与kafka服务

七、部署ELFK+zookeeper+kafka


  1. Filebeat/Fluentd:负责从各服务器节点上实时收集日志数据,Filebeat轻量级,适合大规模部署,Fluentd功能强大,支持丰富的插件和灵活的过滤规则。

  2. Kafka:作为一个分布式消息队列系统,承担起数据缓冲和中转的角色。日志数据先发送到Kafka集群,一方面可以缓解Logstash或Filebeat的压力,另一方面支持多消费者模型,允许数据被多个下游系统并行消费。此外,Kafka的高吞吐量和持久化特性使得系统在面临大量日志输入时仍能保持稳定。

  3. Logstash:从Kafka集群中消费日志数据,进行必要的数据解析、过滤、转换等预处理操作,然后将结构化后的数据发送到Elasticsearch

  4. ZooKeeper:在某些场景下,ZooKeeper可以用于管理Kafka集群的元数据,例如Broker注册、Topic的分区分配等,确保Kafka集群的稳定性和一致性。同时,对于Logstash或Kafka Connect这类组件,也可以通过ZooKeeper获取集群配置信息。

  5. Elasticsearch:存储经过处理的日志数据,提供全文搜索、聚合分析等功能,便于后期进行日志分析和故障排查。

  6. Kibana:作为前端展示工具,基于Elasticsearch的数据创建可视化图表和仪表盘,为用户提供友好的日志分析界面。

综合起来,ELFK+Zookeeper+Kafka架构结合了日志收集、处理、存储和分析的全链条,极大地提高了日志管理的效率和用户体验。

在ELFK+Zookeeper+Kafka架构中,日志数据的流转路径通常是这样的: 应用程序日志 -> Filebeat -> Kafka -> (Logstash ->) Elasticsearch -> Kibana

环境准备

IP地址主机名安装服务
192.168.83.30node1

JDK-1.18.0

Elasticsearch-6.6.1

192.168.83.40node2

JDK-1.18.0

Elasticsearch-6.6.1

192.168.83.50logstash

JDK-1.18.0

logstash-6.6.1

httpd

192.168.83.60kibana

JDK-1.18.0

kibana-6.6.1

192.168.83.70filebeat

JDK-1.18.0

filebeat-6.6.1

192.168.83.80zk-ka1

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.90zk-ka2

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.100zk-ka3

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

一、部署jdk环境

在所有机器上使用脚本安装jdk环境

  1. #!/bin/bash
  2. JAVA=`find  / -name  *jdk*tar*  -exec dirname {} \;`
  3. #找到jdk的压缩包所在目录,并将设置为变量JAVA
  4. JDK=`find  /  -name  *jdk*tar* 2>>/dev/null |awk  -F/ '{print $NF}'`
  5. #找打jdk压缩包的名字,设置为变量JDK
  6. cd ${JAVA}
  7. tar xf ${JDK} -C /usr/local/
  8. #切换到压缩包所在目录,并指定解压到/usr/local/目录下
  9. JDKAPP=`find  /usr/local/   -maxdepth 1  -type  d  |grep  jdk|awk  -F/  '{print  $NF}'`
  10. #找到解压后的目录名,并设置为变量名JDKAPP
  11. ln  -s  /usr/local/${JDKAPP}  /usr/local/jdk
  12. #做软链接,便于shell环境识别命令
  13. cat > /etc/profile.d/jdk.sh  <<EOF
  14. export JAVA_HOME=/usr/local/jdk
  15. export PATH=\$JAVA_HOME/bin:\$PATH
  16. export JRE_HOME=\$JAVA_HOME/jre 
  17. export CLASSPATH=\$JAVA_HOME/lib/:\$JRE_HOME/lib/
  18. EOF
  19. #修改环境变量
  20. echo "请执行  source /etc/profile.d/jdk.sh 命令,刷新文件

小结

  1. '-----------------------------脚本安装JDK-----------------------------'
  2. [root@node1 opt]#cat jdk.sh
  3. #!/bin/bash
  4. JAVA=`find / -name *jdk*tar* -exec dirname {} \;`
  5. JDK=`find / -name *jdk*tar* 2>>/dev/null |awk -F/ '{print $NF}'`
  6. cd ${JAVA}
  7. tar xf ${JDK} -C /usr/local/
  8. JDKAPP=`find /usr/local/ -maxdepth 1 -type d |grep jdk|awk -F/ '{print $NF}'`
  9. ln -s /usr/local/${JDKAPP} /usr/local/jdk
  10. cat > /etc/profile.d/jdk.sh <<EOF
  11. export JAVA_HOME=/usr/local/jdk
  12. export PATH=\$JAVA_HOME/bin:\$PATH
  13. export JRE_HOME=\$JAVA_HOME/jre
  14. export CLASSPATH=\$JAVA_HOME/lib/:\$JRE_HOME/lib/
  15. EOF
  16. echo "请执行 source /etc/profile.d/jdk.sh 命令,刷新文件"
  17. [root@node1 opt]#bash jdk.sh
  18. [root@node1 opt]#source /etc/profile.d/jdk.sh

二、搭建Elasticsearch

环境准备

IP地址主机名安装服务
192.168.83.30node1

JDK-1.18.0

Elasticsearch-6.6.1

192.168.83.40node2

JDK-1.18.0

Elasticsearch-6.6.1

修改配置文件


 

  1. cluster.name: elk-cluster
  2. #设置Elasticsearch集群的名称为 "elk-cluster"。
  3. #这意味着所有带有相同cluster.name配置的Elasticsearch节点将会尝试加入同一个集群。
  4. node.name: node1
  5. #指定当前节点的名字为 "node1",进行区分,便于集群内部管理和监控集群中的各个节点。
  6. '另一台服务器一般指定为node2。也可以设置IP地址'
  7. path.data: /data/elk_data
  8. #设置Elasticsearch数据存储路径,这是Elasticsearch存放索引数据的地方。
  9. '需要自己手动创建,并修改属主与属组'
  10. path.logs: /var/log/elasticsearch
  11. #设置Elasticsearch日志文件的存放路径,Elasticsearch的所有日志都会写入到这个目录下的文件
  12. bootstrap.memory_lock: false
  13. #设置为 "false" 表示不锁定内存。
  14. #若设为 true,Elasticsearch将尝试锁定全部分配给它的内存,防止在操作系统层面被交换到磁盘。
  15. #在生产环境中,通常建议开启内存锁定以获得更好的性能和稳定性,但需要有足够的权限,并确保物理内存充足。
  16. network.host: 0.0.0.0
  17. #设置为 "0.0.0.0",表示Elasticsearch节点监听所有可用网络接口上的连接请求,对外提供服务。
  18. http.port: 9200
  19. #设置HTTP协议的服务端口为 "9200"
  20. discovery.zen.ping.unicast.hosts: ["node1", "node2"]
  21. #设置Elasticsearch集群发现机制的初始节点列表,这里包含了 "node1" 和 "node2" 两个节点地址。
  22. #集群中的节点通过互相ping这些地址来发现彼此并组建集群。
  23. #在启动过程中,节点会尝试联系这个列表中的其他节点来参与集群。

启动服务

登录web界面查看信息

小结

  1. '-----------------------------安装elasticsearch-----------------------------'
  2. [root@node1 opt]#ls
  3. elasticsearch-6.6.1.rpm jdk-8u291-linux-x64.tar.gz jdk.sh
  4. [root@node1 opt]#rpm -ivh elasticsearch-6.6.1.rpm
  5. [root@node1 opt]#systemctl daemon-reload
  6. [root@node1 opt]#systemctl enable elasticsearch.service
  7. '-----------------------------修改配置文件-----------------------------'
  8. [root@node1 opt]#cp /etc/elasticsearch/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml.bak
  9. [root@node1 opt]#vim /etc/elasticsearch/elasticsearch.yml
  10. 17 cluster.name: my-esh
  11. 23 node.name: node1
  12. 33 path.data: /data/elk_data
  13. 37 path.logs: /var/log/elasticsearch
  14. 43 bootstrap.memory_lock: false
  15. 55 network.host: 0.0.0.0
  16. 59 http.port: 9200
  17. 68 discovery.zen.ping.unicast.hosts: ["node1", "node2"]
  18. [root@node1 opt]#cat /etc/hosts
  19. 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
  20. ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
  21. 192.168.83.30 node1
  22. 192.168.83.40 node2
  23. [root@node1 opt]#mkdir -p /data/elk_data
  24. [root@node1 opt]#chown elasticsearch.elasticsearch /data/elk_data

三、搭建logstash

IP地址主机名安装服务
192.168.83.50logstash

JDK-1.18.0

logstash-6.6.1

httpd

测试服务情况

收到返回信息后在elasticsearch服务器上查看

小结

  1. '-----------------------------安装logstash服务-----------------------------'
  2. [root@logstash opt]#rpm -ivh logstash-6.6.1.rpm
  3. [root@logstash opt]#systemctl enable --now logstash.service
  4. [root@logstash opt]#ln -s /usr/share/logstash/bin/logstash /usr/local/bin
  5. '-----------------------------测试logstash服务-----------------------------'
  6. [root@logstash opt]#logstash -e 'input { stdin{} } output { elasticsearch { hosts=>["192.168.83.30:9200"] } }'
  7. www.baidu.com
  8. ...........

四、搭建kibana服务

环境准备

IP地址主机名安装服务
192.168.83.60kibana

JDK-1.18.0

kibana-6.6.1

使用rpm包直接安装

修改配置文件

  1. server.port: 5601
  2. #此行配置了Kibana服务监听的端口号为5601
  3. server.host: "0.0.0.0"
  4. #该配置指示Kibana服务器绑定到所有可用的网络接口
  5. elasticsearch.hosts: ["http://192.168.83.30:9200"]
  6. #配置了Kibana连接的Elasticsearch集群地址和端口
  7. kibana.index: ".kibana"
  8. #Kibana使用此配置来指定在Elasticsearch中存储其自身配置和状态的索引名称

验证kibana服务

在logstash服务器上开启httpd服务:systemctl start httpd

创建收集httpd服务日志信息的配置文件

  1. #input字段
  2. #Logstash使用两个file输入插件,分别配置了两个日志文件路径
  3. '第一个file插件配置了Apache服务器的访问日志路径/etc/httpd/logs/access_log,
  4. 并将日志类型标记为"type"字段的"access"。'
  5. '第二个file插件配置了Apache服务器的错误日志路径/etc/httpd/logs/error_log,并将日志类型标记为"type"字段的"error"'
  6. 'start_position => "beginning" 表示从日志文件的开始位置读取数据'
  7. #这对于初次启动或重新配置Logstash时很有用,可以确保从头开始处理日志。
  8. #output字段
  9. '根据[type]字段的值,Logstash通过elasticsearch输出插件将处理后的数据发送到Elasticsearch集群'
  10. '当[type]字段等于"access"时,数据将被发送到Elasticsearch,并按日期格式创建索引,
  11. 例如apache_access-2024.04.14,每天都会创建一个新的索引。'
  12. '当[type]字段等于"error"时,数据同样被发送到相同的Elasticsearch集群
  13. 但是存储在按日期格式命名的另一个索引中,例如apache_error-2024.04.14'

创建索引访问

小结

  1. '-----------------------------安装kibana服务-----------------------------'
  2. [root@kibana opt]#rpm -ivh kibana-6.6.1-x86_64.rpm
  3. 警告:kibana-6.6.1-x86_64.rpm: 头V4 RSA/SHA512 Signature, 密钥 ID d88e42b4: NOKEY
  4. 准备中... ################################# [100%]
  5. 正在升级/安装...
  6. 1:kibana-6.6.1-1 ################################# [100%]
  7. [root@kibana opt]#vim /etc/kibana/kibana.yml
  8. 2行 server.port: 5601
  9. 7行 server.host: "0.0.0.0"
  10. 28行 elasticsearch.hosts: ["http://192.168.83.30:9200"]
  11. 37行 kibana.index: ".kibana"
  12. [root@kibana opt]#systemctl enable --now kibana.service
  13. [root@kibana opt]#netstat -natp | grep 5601
  14. tcp 0 0 0.0.0.0:5601 0.0.0.0:* LISTEN 60998/node
  15. #浏览192.168.83.50:5601进行访问测试

五、搭建filebeat服务

IP地址主机名安装服务
192.168.83.70filebeat

JDK-1.18.0

filebeat-6.6.1

修改配置文件

  1. 21 - type: log
  2. #定义了一个名为"log"类型的输入处理器,这是一种用于从日志文件中收集数据的模块
  3. 24 enabled: true
  4. #启用该模块
  5. 27 paths:
  6. 28 - /var/log/*.log
  7. 29 - /var/log/messages
  8. #指定了需要监控的日志文件路径
  9. 46 fields:
  10. 47 server_name: fb
  11. 48 log_type: log
  12. 49 server_id: 192.168.83.70
  13. #这些字段会被附加到从日志文件中提取的每条事件上。
  14. 152 #output.elasticsearch:
  15. 153 # Array of hosts to connect to.
  16. 154 # hosts: ["localhost:9200"]
  17. #将输出到elasticsearch模块的信息注释掉
  18. 165 output.logstash:
  19. 167 hosts: ["192.168.83.50:5044"]
  20. #将信息输出到logstash服务器上,并指定服务器地址与端口号

在logstash服务器上添加接收filebeat服务器传输的信息的配置文件

  1. input {
  2. beats {
  3. port => "5044"
  4. }
  5. }
  6. #定义logstash的5044端口,接收来自beats工具的所有数据
  7. output {
  8. elasticsearch {
  9. hosts => ["192.168.83.30:9200"]
  10. index => "%{[fields][server_name]}-%{+YYYY.MM.dd}"
  11. }
  12. }
  13. #将接收的数据传输到elasticsearch服务器上
  14. #并将索引名称定义为filebeat配置文件中,事件字段fields中server_name的值,并以时间结尾

在另一个终端上查看端口号是否开启

确认logstash服务器的5044端口还在监听状态后,在filebeat服务器上启动filebeat服务并加载配置文件

filebeat -e -c /etc/filebeat/filebeat.yml

登录kibana服务web界面

小结

  1. '-----------------------------安装启动filebeat服务-----------------------------'
  2. [root@filebeat opt]#rpm -ivh filebeat-6.6.1-x86_64.rpm
  3. [root@filebeat opt]#cp /etc/filebeat/filebeat.yml /etc/filebeat/filebeat.yml_bak
  4. [root@filebeat opt]#vim /etc/filebeat/filebeat.yml
  5. 21 - type: log
  6. 24 enabled: true
  7. 27 paths:
  8. 28 - /var/log/*.log
  9. 29 - /var/log/messages
  10. 46 fields:
  11. 47 server_name: fb
  12. 48 log_type: log
  13. 49 server_id: 192.168.83.70
  14. 152 #output.elasticsearch:
  15. 153 # Array of hosts to connect to.
  16. 154 # hosts: ["localhost:9200"]
  17. 165 output.logstash:
  18. 167 hosts: ["192.168.83.50:5044"]
  19. [root@filebeat opt]#filebeat -e -c /etc/filebeat/filebeat.yml
  20. .......
  21. '-----------------------------添加logstash服务配置文件-----------------------------'
  22. [root@logstash ~]#vim /etc/logstash/conf.d/fb.conf
  23. input {
  24. beats {
  25. port => "5044"
  26. }
  27. }
  28. output {
  29. elasticsearch {
  30. hosts => ["192.168.83.30:9200"]
  31. index => "%{[fields][server_name]}-%{+YYYY.MM.dd}"
  32. }
  33. }
  34. [root@logstash ~]#logstash -f /etc/logstash/conf.d/fb.conf
  35. ............

六、搭建zookeeper与kafka服务

IP地址主机名安装服务
192.168.83.80zk-ka1

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.90zk-ka2

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

192.168.83.100zk-ka3

JDK-1.18.0

apache-zookeeper-3.5.7-bin

kafka_2.13-2.7.1

在三台机器上安装zookeeper服务与Kafka服务

修改zookeeper服务配置文件

三台服务器配置文件相同

  1. tickTime=2000
  2. #基本时间单元,所有超时和心跳时间间隔都以tickTime的倍数来表示。这里设置为2000毫秒,即2秒
  3. initLimit=10
  4. #初始化连接时的最大时间限制,单位为tickTime。
  5. #当follower启动并试图连接leader时,follower在10*2秒(即20秒)内必须完成与leader初始同步
  6. syncLimit=5
  7. #leader与follower之间发送消息的同步确认的最大时间限制,同样单位为tickTime。
  8. #这意味着follower必须在5*2秒(即10秒)内响应leader的心跳或同步请求。
  9. dataDir=/usr/local/zookeeper-3.5.7/data
  10. #ZooKeeper存放持久化数据的目录,如事务日志、快照等。需要手动创建
  11. dataLogDir=/usr/local/zookeeper-3.5.7/logs
  12. #ZooKeeper专门存放事务日志的目录,分离数据和日志存储可以优化磁盘I/O性能。需要手动创建
  13. clientPort=2181
  14. #ZooKeeper服务监听客户端连接的端口号,客户端通过这个端口与ZooKeeper集群进行通信。
  15. server.1=192.168.83.70:3188:3288
  16. server.2=192.168.83.80:3188:3288
  17. server.3=192.168.83.90:3188:3288
  18. #这三行配置描述了ZooKeeper集群中的三个服务器节点。格式为server.id=hostname:port1:port2
  19. #其中:
  20. #id(1、2、3)是集群中服务器的唯一标识。
  21. #hostname(192.168.83.70、192.168.83.80、192.168.83.90)是服务器的IP地址。
  22. #port1(3188)是集群内部通信的端口,用于follower和observer与其他服务器通信。
  23. #port2(3288)是选举leader时服务器之间通信的端口。

修改kafka服务配置文件

三台服务器的kafka配置文件中只有broker.id、listeners字段值不同

  1. broker.id=0
  2. #定义当前Kafka Broker的唯一标识ID。在集群中,每个Broker的ID都应是唯一的。
  3. #其它两台服务器设置为1,2
  4. listeners=PLAINTEXT://192.168.83.70:9092
  5. #指定Broker监听客户端连接的地址和端口,这里是使用PLAINTEXT协议
  6. #三台服务器分别监听本机的IP地址
  7. num.network.threads=3
  8. #设置网络IO线程数量,用于处理网络请求,比如接收生产者的消息和响应消费者的请求。
  9. num.io.threads=8
  10. #设置磁盘IO线程数量,用于处理磁盘读写操作,如写入日志文件和从磁盘读取消息。
  11. socket.send.buffer.bytes=102400 和 socket.receive.buffer.bytes=102400
  12. #分别设置Socket发送和接收缓冲区的大小(单位为字节),影响TCP层的数据传输效率。
  13. socket.request.max.bytes=104857600
  14. #设置单个请求允许的最大字节数,超过这个大小的请求会被拒绝。
  15. log.dirs=/usr/local/kafka/logs
  16. #设置Kafka日志数据的存储目录,即Kafka消息持久化的路径。
  17. num.partitions=1
  18. #默认每个主题的分区数量,这里设定为每个主题初始化时只有一个分区。
  19. num.recovery.threads.per.data.dir=1
  20. #指定每个数据目录(日志目录)下用于恢复数据的线程数量。
  21. offsets.topic.replication.factor=1 和 transaction.state.log.replication.factor=1
  22. #这两个配置分别指定了Kafka内部主题(__consumer_offsets)和事务状态日志的复制因子,
  23. #这里均为1,表示没有数据复制,所有数据仅在一个Broker上存储。
  24. transaction.state.log.min.isr=1
  25. #设置事务状态日志的最小ISR大小,这里为1,意味着只要有一个副本是同步的,就满足要求。
  26. log.retention.hours=168
  27. #设置日志保留时间,这里是168小时(7天),超过这个时间的数据将被删除。
  28. log.segment.bytes=1073741824
  29. #每个日志分段的大小,当达到这个阈值时,Kafka会创建新的日志分段。
  30. log.retention.check.interval.ms=300000
  31. #日志清理检查间隔,每隔300000毫秒(5分钟)检查一次日志是否需要删除。
  32. zookeeper.connect=192.168.83.70:2181,192.168.83.80:2181,192.168.83.90:2181
  33. #设置连接ZooKeeper集群的地址和端口,此处配置了一个由三个节点构成的ZooKeeper集群。
  34. zookeeper.connection.timeout.ms=18000
  35. #ZooKeeper连接超时时间,设置为18000毫秒(18秒)。
  36. group.initial.rebalance.delay.ms=0
  37. #消费者组在启动时的初始再平衡延迟时间,设置为0表示消费者在加入组时立即开始再平衡。

创建zookeeper启动脚本,三台服务器脚本文件相同

  1. #!/bin/bash
  2. 声明该脚本使用bash shell进行解释执行。
  3. #chkconfig:2345 20 90
  4. 这行注释是针对Red Hat家族(如CentOS、Fedora等)Linux发行版的chkconfig工具的指令,
  5. #用于在不同运行级别(2、3、4、5)下设置服务的启动优先级(20,较高)和停止优先级(90,较低)。
  6. #description:Zookeeper Service Control Script
  7. 描述脚本的功能,即ZooKeeper服务的控制脚本。
  8. ZK_HOME='/usr/local/zookeeper-3.5.7'
  9. #定义ZooKeeper的安装目录,便于在脚本中引用。
  10. 脚本主体部分使用case $1 in结构,根据传入的第一个参数($1)执行相应的操作:
  11. start:启动ZooKeeper服务,执行$ZK_HOME/bin/zkServer.sh start命令。
  12. stop:停止ZooKeeper服务,执行$ZK_HOME/bin/zkServer.sh stop命令。
  13. restart:重启ZooKeeper服务,执行$ZK_HOME/bin/zkServer.sh restart命令。
  14. status:查看ZooKeeper服务的状态,执行$ZK_HOME/bin/zkServer.sh status命令。
  15. *:如果传入的参数不在上述情况中,脚本将打印正确的使用方式。
  16. chmod +x /etc/init.d/zookeeper #添加执行权限
  17. chkconfig --add zookeeper #设置开机自动进行管理

启动服务

创建kafka启动脚本,三台服务器脚本文件相同,并依次启动

  1. #!/bin/bash:定义脚本使用的解释器为Bash Shell。
  2. #chkconfig:2345 22 88
  3. #这是针对Red Hat系列Linux(如CentOS、Fedora等)的 chkconfig 工具的注释行,
  4. #用来设置服务在运行级别2、3、4、5下的启动优先级(22)和关闭优先级(88)。
  5. #description:Kafka Service Control Script
  6. #描述脚本功能,即用于控制Apache Kafka服务的启动、停止、重启和查看状态。
  7. KAFKA_HOME='/usr/local/kafka'
  8. #定义Kafka的安装目录。
  9. case $1 in
  10. #根据用户传入的第一个参数($1)执行不同的操作。
  11. start
  12. #启动Kafka服务,
  13. #通过${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties命令执行。
  14. #-daemon选项表示以守护进程方式运行Kafka服务器,server.properties是Kafka的配置文件。
  15. stop
  16. #停止Kafka服务,执行${KAFKA_HOME}/bin/kafka-server-stop.sh命令。
  17. restart
  18. #先调用 $0 stop 停止服务,然后调用$0start重新启动服务。
  19. status
  20. #检查Kafka服务是否正在运行。
  21. #通过ps -ef | grep kafka | egrep -cv "grep|$$"命令统计Kafka进程的数量,
  22. #如果不为0,则说明Kafka正在运行;否则,Kafka未运行。
  23. *
  24. #如果传入的参数不在上述情况中,脚本将打印正确的使用方式

小结

  1. '-----------------------------安装zookeeper服务--------------------------------'
  2. [root@kibana opt]#tar xf apache-zookeeper-3.5.7-bin.tar.gz
  3. [root@kibana opt]#mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
  4. [root@kibana opt]#cd /usr/local/zookeeper-3.5.7/conf/
  5. [root@kibana opt]#cp zoo_sample.cfg zoo.cfg
  6. [root@kibana opt]#vim zoo.cfg
  7. 2 tickTime=2000
  8. 5 initLimit=10
  9. 8 syncLimit=5
  10. 12 dataDir=/usr/local/zookeeper-3.5.7/data
  11. 13 dataLogDir=/usr/local/zookeeper-3.5.7/logs
  12. 15 clientPort=2181
  13. 30 server.1=192.168.83.80:3188:3288
  14. 31 server.2=192.168.83.90:3188:3288
  15. 32 server.3=192.168.83.100:3188:3288
  16. [root@kibana opt]#mkdif -p /usr/local/zookeeper-3.5.7/data
  17. [root@kibana opt]#mkdif -p /usr/local/zookeeper-3.5.7/logs
  18. '---------------------------创建zookeeper服务启动脚本------------------------------'
  19. [root@kibana opt]#vim /etc/init.d/zookeeper
  20. #!/bin/bash
  21. #chkconfig:2345 20 90
  22. #description:Zookeeper Service Control Script
  23. ZK_HOME='/usr/local/zookeeper-3.5.7'
  24. case $1 in
  25. start)
  26. echo "---------- zookeeper 启动 ------------"
  27. $ZK_HOME/bin/zkServer.sh start
  28. ;;
  29. stop)
  30. echo "---------- zookeeper 停止 ------------"
  31. $ZK_HOME/bin/zkServer.sh stop
  32. ;;
  33. restart)
  34. echo "---------- zookeeper 重启 ------------"
  35. $ZK_HOME/bin/zkServer.sh restart
  36. ;;
  37. status)
  38. echo "---------- zookeeper 状态 ------------"
  39. $ZK_HOME/bin/zkServer.sh status
  40. ;;
  41. *)
  42. echo "Usage: $0 {start|stop|restart|status}"
  43. esac
  44. [root@kibana opt]#chmod +x /etc/init.d/zookeeper
  45. [root@kibana opt]#chkconfig --add zookeeper
  46. [root@kibana opt]#service zookeeper start
  47. '-----------------------------安装zookeeper服务--------------------------------'
  48. [root@kibana opt]#tar xf kafka_2.13-2.7.1.tgz
  49. [root@kibana opt]#mv kafka_2.13-2.7.1 /usr/local/kafka
  50. [root@kibana opt]#cd /usr/local/kafka/config/
  51. [root@kibana opt]#cp server.properties server.properties_bak
  52. [root@kibana opt]#vim server.properties
  53. 21 broker.id=0 #每台服务器的唯一标识不一样
  54. 31 listeners=PLAINTEXT://192.168.83.80:9092 #监听本机的IP地址的9092的端口号
  55. 42 num.network.threads=3
  56. 45 num.io.threads=8
  57. 48 socket.send.buffer.bytes=102400
  58. 51 socket.receive.buffer.bytes=102400
  59. 54 socket.request.max.bytes=104857600
  60. 60 log.dirs=/usr/local/kafka/logs
  61. 65 num.partitions=1
  62. 69 num.recovery.threads.per.data.dir=1
  63. 74 offsets.topic.replication.factor=1
  64. 75 transaction.state.log.replication.factor=1
  65. 76 transaction.state.log.min.isr=1
  66. 103 log.retention.hours=168
  67. 110 log.segment.bytes=1073741824
  68. 114 log.retention.check.interval.ms=300000
  69. 123 zookeeper.connect=192.168.83.80:2181,192.168.83.90:2181,192.168.83.100:2181
  70. 126 zookeeper.connection.timeout.ms=18000
  71. 136 group.initial.rebalance.delay.ms=0
  72. '---------------------------创建kafka服务启动脚本------------------------------'
  73. [root@kibana opt]#vim /etc/init.d/kafka
  74. #!/bin/bash
  75. #chkconfig:2345 22 88
  76. #description:Kafka Service Control Script
  77. KAFKA_HOME='/usr/local/kafka'
  78. case $1 in
  79. start)
  80. echo "---------- Kafka 启动 ------------"
  81. ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
  82. ;;
  83. stop)
  84. echo "---------- Kafka 停止 ------------"
  85. ${KAFKA_HOME}/bin/kafka-server-stop.sh
  86. ;;
  87. restart)
  88. $0 stop
  89. $0 start
  90. ;;
  91. status)
  92. echo "---------- Kafka 状态 ------------"
  93. count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
  94. if [ "$count" -eq 0 ];then
  95. echo "kafka is not running"
  96. else
  97. echo "kafka is running"
  98. fi
  99. ;;
  100. *)
  101. echo "Usage: $0 {start|stop|restart|status}"
  102. esac
  103. [root@kibana opt]#chmod +x /etc/init.d/kafka
  104. [root@kibana opt]#chkconfig --add kafka
  105. [root@kibana opt]#service kafka start

七、部署ELFK+zookeeper+kafka

(一)修改filebeat服务器配置文件

  1. filebeat.inputs
  2. #定义了Filebeat应该监控哪些日志文件,并接收日志信息。
  3. - type: log
  4. #定义输入类型为日志文件。
  5. enabled: true
  6. #启用此输入源
  7. paths:
  8. #指定日志文件的路径列表,指定/var/log/nginx/access_log,表示监控nginx服务器的访问日志
  9. tags: ["access"]
  10. #给收集到的数据打上标签,这里打上了"access"标签,可以在后续处理和分析时用于筛选或分类
  11. '第二段信息含义与第一段一致'

输入:filebeat  -e -c /etc/filebeat/filebeat.yml   加载配置文件

(二)在logstash服务器上添加配置文件

  1. input 字段:
  2. kafka {
  3. #表示从Kafka集群中读取数据:
  4. bootstrap_servers => "192.168.10.17:9092,192.168.10.21:9092,192.168.10.22:9092"
  5. #指定了Kafka集群中Broker服务器的地址列表,Logstash将尝试连接这些地址以获取消息。
  6. topics => "nginx"
  7. #指定了要订阅的主题名称,这里是"nginx"。
  8. type
  9. #设置了输入数据的类型为"nginx_kafka"
  10. codec => "json"
  11. #配置了数据编码解码器为"json",这意味着Logstash将自动解析从Kafka接收到的消息内容为JSON格式。
  12. auto_offset_reset => "earliest"
  13. #设置为"earliest",从最初的消息开始读取。
  14. decorate_events => true
  15. #表示Logstash将在传递给Elasticsearch的数据中添加Kafka相关的元数据信息。
  16. output字段:
  17. 根据数据中的tags字段进行逻辑判断,分别将数据输出到不同的Elasticsearch索引:
  18. #若tags字段包含"access",则将数据写入名为"nginx_access-%{+YYYY.MM.dd}"的索引
  19. #若tags字段包含"error",则将数据写入名为"nginx_error-%{+YYYY.MM.dd}"的索引

在elasticsearch服务器上查看所有索引

在kibana服务器上建立索引并查看信息

  1. '-----------------------------修改filebeat服务配置文件--------------------------------'
  2. [root@filebeat opt]#vim /etc/filebeat/filebeat.yml
  3. 15 filebeat.inputs:
  4. 16 - type: log
  5. 17 enabled: true
  6. 18 paths:
  7. 19 - /var/log/nginx/access.log
  8. 20 tags: ["access"]
  9. 21 - type: log
  10. 22 enabled: true
  11. 23 paths:
  12. 24 - /var/log/nginx/error.log
  13. 25 tags: ["error"]
  14. 160 #----------------------------- Kafka output --------------------------------
  15. 161 output.kafka:
  16. 162 enabled: true
  17. 163 hosts: ["192.168.83.80:9092","192.168.83.90:9092","192.168.83.100:9092"]
  18. 164 topic: "nginx"
  19. 165 #----------------------------- Logstash output --------------------------------
  20. 166 #output.logstash:
  21. 167 # The Logstash hosts
  22. 168 # hosts: ["192.168.83.50:5044"]
  23. [root@filebeat opt]#filebeat -e -c /etc/filebeat/filebeat.yml
  24. '-----------------------------添加logstash服务配置文件--------------------------------'
  25. [root@logstash ~]#vim /etc/logstash/conf.d/kafka.conf
  26. input {
  27. kafka {
  28. bootstrap_servers => "192.168.83.80:9092,192.168.83.90:9092,192.168.83.100:9092"
  29. topics => "nginx"
  30. type => "nginx_kafka"
  31. codec => "json"
  32. auto_offset_reset => "earliest"
  33. decorate_events => true
  34. }
  35. }
  36. output {
  37. if "access" in [tags] {
  38. elasticsearch {
  39. hosts => ["192.168.83.30:9200"]
  40. index => "nginx_access-%{+YYYY.MM.dd}"
  41. }
  42. }
  43. if "error" in [tags] {
  44. elasticsearch {
  45. hosts => ["192.168.83.30:9200"]
  46. index => "nginx_error-%{+YYYY.MM.dd}"
  47. }
  48. }
  49. }
  50. [root@logstash ~]#logstash -f /etc/logstash/conf.d/kafka.conf
  51. #登录kibana服务web界面创建索引进行查看
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/代码探险家/article/detail/1007730
推荐阅读
相关标签
  

闽ICP备14008679号