当前位置:   article > 正文

ELK日志集成(filebeat+kafka+logstash+elasticsearch+kibana)_filebeat elasticsearch kafka logstash kibana

filebeat elasticsearch kafka logstash kibana

1、日志文件准备

首先日志集成json格式,这个在我搭建的项目案例中的logback-spring.xml中有过配置如何将日志转成json格式,具体参考gitee项目的logback-spring.xml配置,也可参考之前写过的一篇文章

2、准备安装包及环境

需要kafka集群,Elasticsearch集群, logstash,filebeat,Kibana

Elasticsearch集群参考我之前的https://blog.csdn.net/xibei19921101/article/details/112527812这篇文章

Kafka集群参考https://blog.csdn.net/xibei19921101/article/details/119419868这篇文章

Elasticsearch、Kibanafilebeatlogstash下载都可以在 elastic 的官方网站获取自己想要的版本

  1. ##下载filebeat,6.8.7的版本
  2. wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.7-linux-x86_64.tar.gz
  3. ##下载kibana,6.8.7的版本
  4. wget https://artifacts.elastic.co/downloads/kibana/kibana-6.8.7-linux-x86_64.tar.gz
  5. ##下载logstash,6.8.7的版本
  6. wget https://artifacts.elastic.co/downloads/logstash/logstash-6.8.7.tar.gz

 3、安装filebeat 

在应用启动的机器上安装filebeat,将日志推送到kafka集群中

安装步骤:解压缩下载的压缩包,修改filebeat.yml文件,创建启动脚本restart.sh

restart.sh

  1. #!/bin/bash
  2. pids=`ps -ef| grep filebeat |grep -v grep |awk '{print $2}'`
  3. if [[ -z $pids ]]; then
  4. echo 'process of filebeat not exist,begin to run'
  5. nohup ./filebeat -e -c filebeat.yml >filebeat.log 2>&1 &
  6. ##可以防止日志爆盘,将所有标准输出及标准错误输出到/dev/null空设备,即没有任何输出信息。
  7. #nohup ./filebeat -e -c filebeat.yml >/dev/null &
  8. else
  9. echo 'filebeat exist,will be killed and restart'
  10. kill -9 $pids
  11. echo 'filebeat has been killed...'
  12. echo 'filebeat restart now...'
  13. nohup ./filebeat -e -c filebeat.yml >filebeat.log 2>&1 &
  14. fi

 filebeat.yml:

  1. filebeat:
  2. prospectors:
  3. -
  4. paths:
  5. - /home/scene/bin/logs/springboot_demo/json.log
  6. #filebeat可推送多个日志文件,可以继续换行添加
  7. #- /home/scene/bin/*/json.log #可以使用通配符指定扫描一系列日志
  8. input_type: log
  9. exclude_files: ['health.log$']
  10. scan_freqency: "10s"
  11. backoff: "1s"
  12. json.message_key: log
  13. json.keys_under_root: true
  14. json.overwrite_keys: true
  15. #fields: {project: "myproject", instance-id: "123452341435"} #可在最终日志输出字段中添加额外的字段
  16. #fields_under_root: true #若此字段设置为true,则自定义字段将存储为输出文档中的顶级字段,而不是在fields字段下分组,若自定义字段与其他字段名冲突,则自定义字段将覆盖其他字段
  17. #include_lines: ['ERROR','WARN']
  18. output:
  19. kafka:
  20. enable: true
  21. hosts: ["192.168.236.131:9092","192.168.236.130:9092","192.168.236.129:9092"] #服务集群
  22. topic: "elklog"
  23. partition.hash:
  24. reachable_only: true
  25. compression: gzip
  26. max_message_bytes: 1000000
  27. required_acks: 1

 在kafka集群中创建filebeat.yml配置的elklog主题

  1. #在kafka的bin目录下
  2. ./kafka-topics.sh --create --zookeeper 192.168.236.131:2181,192.168.236.130:2181,192.168.236.129:2181 --replication-factor 1 --partitions 3 --topic elklog

启动filebeat,以及之前的kafka集群,即可看到elklog主题有消息推送

4、安装kibana

解压下载的压缩包,进入config目录

修改kibana.yml文件配置,直接配置成如下即可,对应的ip地址改下

  1. # 具体kibana.yml的配置参数说明参考https://www.cnblogs.com/sanduzxcvbnm/p/12837155.html
  2. server.port: 5601
  3. server.host: "192.168.236.131"
  4. # 用来处理所有查询的Elasticsearch实例的URL
  5. elasticsearch.hosts: ["http://192.168.236.131:9200"]
  6. # 用来控制证书的认证,可选的值为full,none,certificate。此处由于没有证书,所以设置为null,否则启动会提示错误.
  7. elasticsearch.ssl.verificationMode: none
  8. # 搜索数据请求超时时间
  9. elasticsearch.requestTimeout: 30000
  10. # 修改 i18n.locale: “zh-CN” 这样 kibana 界面会显示为中文
  11. i18n.locale: "zk-CN"

启动kibana方式,进入 kibana 的bin目录, 执行 ./kibana (注意:kibana 也不能用 root 用户启动)

打开浏览器, 输入htt//ip:5601地址, 若长时间没有响应,试着开放端口或者关闭防火墙

若要关闭kibana,找到kibana启动的pid,有如下几种方法

  1. sudo ps -ef |grep node
  2. sudo fuser -n tcp 5601
  3. sudo netstat -ntulp|grep 5601 ##或者sudo netstat -anltp|grep 5601
  4. ##然后kill -9 进程号 即可关闭

5、安装LogStash

目的是将上述filebeat推送到kafka的消息记录消费至elasticsearch中,便于接下来kibana搜索elasticsearch

解压缩下载的logstash文件,进入到config目录下修改jvm.options的参数配置,实际情形的物理机应该内存够,我的是虚拟机,配置不够,需要修改;还是在config目录下参考logstash-sample.conf编写配置文件logstash.conf,然后在启动脚本中指定所使用的的配置文件

logstash.conf

注意bootstrap_servers必须是字符串,不能用数组,我启动的时候用数组报错了,然后根据日志改成字符串再启动就成功了

  1. input {
  2. kafka {
  3. bootstrap_servers => "192.168.236.131:9092,192.168.236.130:9092,192.168.236.129:9092"
  4. topics => ["elklog"]
  5. auto_offset_reset => "earliest" #偏移量
  6. codec => json {
  7. charset => "UTF-8"
  8. }
  9. #消费线程数,不大于分区个数consumer_threads => 2
  10. }
  11. # 如果有其他数据源,直接在下面追加
  12. }
  13. ##过滤,可要可不要
  14. filter {
  15. # 将message转为json格式
  16. #if [type] == "log" {
  17. # json {
  18. # source => "message"
  19. # target => "message"
  20. # }
  21. #}
  22. }
  23. output {
  24. # 处理后的日志落到本地文件
  25. #file {
  26. # path => "/config-dir/test.log"
  27. # flush_interval => 0
  28. #}
  29. # 处理后的日志入es
  30. elasticsearch {
  31. hosts => ["192.168.236.131:9200","192.168.236.130:9200","192.168.236.129:9200"]
  32. index => "elklog-%{+YYYYMMdd}"
  33. }
  34. }

后台启动logstash的脚本,注意文件路径

  1. #!/bin/bash
  2. pids=`ps -ef| grep logstash |grep -v grep |awk '{print $2}'`
  3. if [[ -z $pids ]]; then
  4. echo 'process of logstash not exist,begin to run'
  5. nohup ./logstash/bin/logstash -f /home/elker/logstash/config/logstash.conf > /home/elker/nohup.log 2>&1 &
  6. else
  7. echo 'logstash exist,will be killed and restart'
  8. kill -9 $pids
  9. echo 'logstash has been killed...'
  10. echo 'logstash restart now...'
  11. nohup ./logstash/bin/logstash -f /home/elker/logstash/config/logstash.conf > /home/elker/nohup.log 2>&1 &
  12. fi

然后启动准备好的java应用(无需太复杂,这篇文章中的就行,因为日志有json格式的日志),做一笔应用的交易,查看应用的日志,再查看es的数据记录

 完美成功将kafka的记录推送至es,据此elk日志集成搭建成功仅一步之遥

启动kibana,在浏览器查看,初步启动配置一下,然后使用@timestamp,elk日志集成搞定

 我的应用是单点应用没有结合微服务,就没有结合日志链路跟踪,实际在微服务中一笔交易可能涉及多应用之间的服务调用,此时普通的方法根据报文中的某些特殊的内容去搜索日志,然后定位就有些不足了,此时日志链路跟踪就发挥作用了,先根据特殊的字符在某一应用中找到交易的traceId,然后根据traceId去定位分析就很方便快捷

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/68098
推荐阅读
相关标签
  

闽ICP备14008679号