当前位置:   article > 正文

ELK+Beats日志分析系统部署_elk beats

elk beats

阅读目录

一、名词介绍:

  • Elasticsearch:分布式搜索和分析引擎,具有高可伸缩、高可靠和易管理等特点。基于 Apache Lucene 构建,能对大容量的数据进行接近实时的存储、搜索和分析操作。通常被用作某些应用的基础搜索引擎,使其具有复杂的搜索功能;
  • Logstash:数据处理引擎,它支持动态的从各种数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操作,然后存储到 ES;
  • Kibana:数据分析和可视化平台。与 Elasticsearch 配合使用,对数据进行搜索、分析和以统计图表的方式展示;
  • Filebeat:ELK 协议栈的新成员,一个轻量级开源日志文件数据搜集器,使用 golang 基于 Logstash-Forwarder 源代码开发,是对它的替代。在需要采集日志数据的 server 上安装 Filebeat,并指定日志目录或日志文件后,Filebeat 就能读取数据,迅速发送到 Logstash 进行解析。

二、应用包准备

elasticsearch-7.2.0.tar.gz

logstash-7.2.0.tar.gz

kibana-7.2.0-linux-x86_64.tar.gz

filebeat-7.2.0-linux-x86_64.tar.gz

jdk-7 以上

三、部署安装:

Logstash 分布式采集

这种架构是对上面架构的扩展,把一个 Logstash 数据搜集节点扩展到多个,分布于多台机器,将解析好的数据发送到 Elasticsearch server 进行存储,最后在 Kibana 查询、生成日志报表等

                                       

这种结构因为需要在各个服务器上部署 Logstash,而它比较消耗 CPU 和内存资源,所以比较适合计算资源丰富的服务器,否则容易造成服务器性能下降,甚至可能导致无法正常工作。

Beats 分布式采集

这种架构引入 Beats 作为日志搜集器。目前 Beats 包括四种:

  • Packetbeat(搜集网络流量数据);
  • Topbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据);
  • Filebeat(搜集文件数据);
  • Winlogbeat(搜集 Windows 事件日志数据)。

Beats 将搜集到的数据发送到 Logstash,经 Logstash 解析、过滤后,将其发送到 Elasticsearch 存储,并由 Kibana 呈现给用户

                        

这种架构解决了 Logstash 在各服务器节点上占用系统资源高的问题。相比 Logstash,Beats 所占系统的 CPU 和内存几乎可以忽略不计。另外,Beats 和 Logstash 之间支持 SSL/TLS 加密传输,客户端和服务器双向认证,保证了通信安全。

因此这种架构适合对数据安全性要求较高,同时各服务器性能比较敏感的场景。

如果logstash 压力很大,那么可以考虑filebeat logstash 之间引入rediskafka作为缓冲。

1、jdk安装

一.安装java
解压到当前目录:

tar -xzvf jdk-8u144-linux-x64.tar.gz -C /usr/local/

临时配置 jdk 环境:(重启之后失效)

  1. export JAVA_HOME=/usr/local/w0624/jdk1.8.0_144
  2. export PATH=$JAVA_HOME/bin:$PATH
  3. export CLASSPATH=.

配置在环境变量中

  1. vim ~/.bashrc
  2.  
  3.  
  4. export JAVA_HOME=/usr/local/jdk1.8.0_144
  5. export JRE_HOME=${JAVA_HOME}/jre
  6. export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
  7. export PATH=${JAVA_HOME}/bin:$PATH
  8. source ~/.bashrc

2、搭建ElasticSearch集群

  • 三台机器

          kaikai1            192.168.210.40

          kaikai2            192.168.210.44

          kaikai3            192.168.210.45

  • 环境配置
  1. sudo vim /etc/security/limits.conf
  2. * soft nproc 65536
  3. * hard nproc 65536
  4. * soft nofile 65536
  5. * hard nofile 65536
  6. sudo vim /etc/sysctl.conf
  7. vm.max_map_count=655360
  8. sudo vim /etc/security/limits.d/90-nproc.conf
  9. * soft nproc 4096
  10. sysctl -p
  • ElasticSearch配置
  1. mkdir -p /data/es-data
  2. mkdir -p /var/log/elasticsearch
  3. chown -R user:user /data/es-data
  4. chown -R user:user /var/log/elasticsearch
  •  解压准备好的安装包
tar -zxvf elasticsearch-7.2.0-linux-x86_64.tar.gz
  • 192.168.210.40 机器配置
  1. vim elasticsearch-7.2.0/config/elasticsearch.yml
  2. # 节点和集群名
  3. cluster.name: es-ELK # 三台机器集群名必须一样
  4. node.name: node-1 # 节点名
  5. node.master: true # 是否设置为主节点
  6. node.data: true # 是否设置为数据节点
  7. # 日志和数据目录
  8. path.data: /data/es-data/ # 数据目录
  9. path.logs: /var/log/elasticsearch/ # 日志目录
  10. # 访问的host和port设置
  11. network.host: 0.0.0.0
  12. http.port: 9200
  13. # 设置跨域请求,为后面的界面化展示使用
  14. http.cors.enabled: true
  15. http.cors.allow-origin: "*"
  16. # 节点挂载
  17. discovery.seed_hosts: ["192.168.210.40", "192.168.210.44","192.168.210.45"]
  18. discovery.zen.minimum_master_nodes: 2
  19. # 使用初始的一组符合主节点条件的节点引导集群 一般与node.name相同
  20. cluster.initial_master_nodes: ["node-1"]
  • 192.168.210.44 机器配置
  1. vim elasticsearch-7.2.0/config/elasticsearch.yml
  2. # 节点和集群名
  3. cluster.name: es-ELK # 三台机器集群名必须一样
  4. node.name: node-2 # 节点名
  5. node.master: false # 是否设置为主节点
  6. node.data: true # 是否设置为数据节点
  7. # 日志和数据目录
  8. path.data: /data/es-data/ # 数据目录
  9. path.logs: /var/log/elasticsearch/ # 日志目录
  10. # 访问的host和port设置
  11. network.host: 0.0.0.0
  12. http.port: 9200
  13. # 设置跨域请求,为后面的界面化展示使用
  14. http.cors.enabled: true
  15. http.cors.allow-origin: "*"
  16. # 节点挂载
  17. discovery.seed_hosts: ["192.168.210.40", "192.168.210.44","192.168.210.45"]
  18. discovery.zen.minimum_master_nodes: 2
  19. # 使用初始的一组符合主节点条件的节点引导集群 一般与node.name相同
  20. cluster.initial_master_nodes: ["node-2"]
  • 192.168.210.45 机器配置
  1. vim elasticsearch-7.2.0/config/elasticsearch.yml
  2. # 节点和集群名
  3. cluster.name: es-ELK # 三台机器集群名必须一样
  4. node.name: node-3 # 节点名
  5. node.master: false # 是否设置为主节点
  6. node.data: true # 是否设置为数据节点
  7. # 日志和数据目录
  8. path.data: /data/es-data/ # 数据目录
  9. path.logs: /var/log/elasticsearch/ # 日志目录
  10. # 访问的host和port设置
  11. network.host: 0.0.0.0
  12. http.port: 9200
  13. # 设置跨域请求,为后面的界面化展示使用
  14. http.cors.enabled: true
  15. http.cors.allow-origin: "*"
  16. # 节点挂载
  17. discovery.seed_hosts: ["192.168.210.40", "192.168.210.44","192.168.210.45"]
  18. discovery.zen.minimum_master_nodes: 2
  19. # 使用初始的一组符合主节点条件的节点引导集群 一般与node.name相同
  20. cluster.initial_master_nodes: ["node-3"]
  • 三台机器分别启动
nohup ./bin/elasticsearch &
  •  执行  curl -X GET http://192.168.210.40:9200 出现以下结果表示安装成功
  1. {
  2.   "name" : "node-1",
  3.   "cluster_name" : "AI-ELK",
  4.   "cluster_uuid" : "pOTpS1cwRRGqiRU4BCY4Qg",
  5.   "version" : {
  6.     "number" : "7.2.0",
  7.     "build_flavor" : "default",
  8.     "build_type" : "tar",
  9.     "build_hash" : "508c38a",
  10.     "build_date" : "2019-06-20T15:54:18.811730Z",
  11.     "build_snapshot" : false,
  12.     "lucene_version" : "8.0.0",
  13.     "minimum_wire_compatibility_version" : "6.8.0",
  14.     "minimum_index_compatibility_version" : "6.0.0-beta1"
  15.   },
  16.   "tagline" : "You Know, for Search"
  17. }
  • 常用的命令
  1. 查询所有数据:curl http://192.168.210.40:9200/_search?pretty
  2. 集群健康状态:curl -XGET http://192.168.210.40:9200/_cluster/health?pretty
  3. 删除所有数据:curl -X DELETE 'http://192.168.210.40:9200/_all'
  4. 删除指定索引:curl -X DELETE 'http://192.168.210.40:9200/索引名称'

注: 这里最好不要使用localhost,因为我们需要远程访问,所以,我们应该直接使用对应服务器的ip地址

到此就安装成功了

  • 下面提供一个安装elasticsearch界面展示在192.168.210.40上安装即可

          git clone git://github.com/mobz/elasticsearch-head.git

  • node安装

           1.安装

              $ sudo apt-get install nodejs

              $ sudo apt-get install npm

          2.升级

                  $ sudo npm install npm -g 

                  $ npm install –g n

                  $ n latest(升级node.js到最新版) or $ n stable(升级node.js到最新稳定版)

                      n后面也可以跟随版本号比如:$ n v0.10.26 或者 $ n 0.10.26

         3.npm镜像替换为淘宝镜像

                 $ npm config set registry http://registry.npm.taobao.org/

                 $ npm get registry 

  • 启动elaticsearch界面项目 
  1. npm install
  2. npm run start

访问http://192.168.210.40:9100/

OK  搜索集群已经搭建好了,一路向前,莫问前程!!!

3、搭建Logstash

  • 解压准备好的安装包
tar -zxvf logstash-7.2.0.tar.gz
  •   编写 采集个日志互相隔离       并进行对标准的日志进行分词

普通启动方式:

nohup bin/logstash -f config/app.yml &

多配置文件启动方式:

nohup bin/logstash -f config/yml/ &

注意:yml是个文件夹下面可以有很多个.yml文件

  1. vim log1.yml
  2. input {
  3. file {
  4. type => "log1"
  5. path => "/var/log/tdsp/t.log"
  6. discover_interval => 10 # 监听间隔
  7. start_position => "beginning"
  8. }
  9. #beats{
  10. # port => "5045"
  11. # }
  12. }
  13. filter {
  14. mutate {
  15. split => {"message" => "|"}
  16. }
  17. mutate {
  18. add_field => {
  19. "date" => "%{[message][0]}"
  20. "grade" => "%{[message][1]}"
  21. "infosource" => "%{[message][2]}"
  22. "msg" => "%{[message][3]}"
  23. "ip2long" => "%{[message][4]}"
  24. }
  25. }
  26. mutate {
  27. convert => {
  28. "date" => "string"
  29. "grade" => "string"
  30. "infosource" => "string"
  31. "msg" => "string"
  32. "ip2long" => "string"
  33. }
  34. }
  35. }
  36. output {
  37. if [type] == "log1" {
  38. elasticsearch {
  39. hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
  40. index => "kaikai-%{+YYYY-MM-dd}"
  41. }
  42. }
  43. }
  44. # 输出在控制台进行调试时候使用
  45. #output {
  46. # if [type] == "log1" {
  47. # stdout {codec => rubydebug}
  48. # }
  49. #}
  1. vim log2.yml
  2. input {
  3. file {
  4. type => "log2"
  5. path => "/var/log/tdsp/w.log"
  6. discover_interval => 10 # 监听间隔
  7. start_position => "beginning"
  8. }
  9. #beats{
  10. # port => "5045"
  11. # }
  12. }
  13. filter {
  14. mutate {
  15. split => {"message" => "|"}
  16. }
  17. mutate {
  18. add_field => {
  19. "date" => "%{[message][0]}"
  20. "grade" => "%{[message][1]}"
  21. "infosource" => "%{[message][2]}"
  22. "msg" => "%{[message][3]}"
  23. "ip2long" => "%{[message][4]}"
  24. }
  25. }
  26. mutate {
  27. convert => {
  28. "date" => "string"
  29. "grade" => "string"
  30. "infosource" => "string"
  31. "msg" => "string"
  32. "ip2long" => "string"
  33. }
  34. }
  35. }
  36. output {
  37. if [type] == "log2" {
  38. elasticsearch {
  39. hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
  40. index => "test-%{+YYYY-MM-dd}"
  41. }
  42. }
  43. }
  44. #output {
  45. # if [type] == "log2" {
  46. # stdout {codec => rubydebug}
  47. # }
  48. #}

多个日志互相隔离方法二模板

  1. input {
  2. file {
  3. type => "log1"
  4. path => "/xxx/xxx/*.log"
  5. discover_interval => 10
  6. start_position => "beginning"
  7. }
  8. file {
  9. type => "log2"
  10. path => "/xxx/xxx/*.log"
  11. discover_interval => 10
  12. start_position => "beginning"
  13. }
  14. file {
  15. type => "log3"
  16. path => "/xxx/xxx/*.log"
  17. discover_interval => 10
  18. start_position => "beginning"
  19. }
  20. #beats{
  21. # port => "5045"
  22. # }
  23. }
  24. filter {
  25. if [type] == "log1" {
  26. mutate {
  27. split => {"message" => "|"} # 分割日志
  28. }
  29. mutate {
  30. add_field => {
  31. "x1" => "%{[message][0]}"
  32. "x2" => "%{[message][1]}"
  33. "x3" => "%{[message][2]}"
  34. }
  35. }
  36. mutate {
  37. convert => {
  38. "x1" => "string"
  39. "x2" => "string"
  40. "x3" => "string"
  41. }
  42. }
  43. json {
  44. source => "xxx"
  45. target => "xxx"
  46. }
  47. mutate {
  48. remove_field => ["xxx","xxx","xxx","xxx"] # 删除字段
  49. }
  50. }
  51. else if [type] == "log2" {
  52. mutate {
  53. split => {"message" => "|"}
  54. }
  55. mutate {
  56. add_field => {
  57. "x1" => "%{[message][0]}"
  58. "x2" => "%{[message][1]}"
  59. "x3" => "%{[message][2]}"
  60. }
  61. }
  62. mutate {
  63. convert => {
  64. "x1" => "string"
  65. "x2" => "string"
  66. "x3" => "string"
  67. }
  68. }
  69. json {
  70. source => "xxx"
  71. target => "xxx"
  72. }
  73. mutate {
  74. remove_field => ["xxx","xxx","xxx","xxx"]
  75. }
  76. }
  77. }
  78. output {
  79. if [type] == "log1" {
  80. elasticsearch {
  81. hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
  82. index => "log1-%{+YYYY-MM-dd}"
  83. }
  84. }
  85. else if [type] == "log2" {
  86. elasticsearch {
  87. hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
  88. index => "log2-%{+YYYY-MM-dd}"
  89. }
  90. }
  91. }
  92. #output {
  93. # stdout {codec => rubydebug}
  94. #}

启动测试

写入两条日志,在es集群中进行查看是否收集到日志

echo "2019-11-18 13:53:35|ERROR|MQTT connected error|错1|2130708993" >> /var/log/tdsp/t.log

echo "2019-11-18 13:53:35|ERROR|MQTT connected error|错1|2130708993" >> /var/log/tdsp/w.log

生成test-2019-11-23  和  kaikai-2019-11-23两个切片表示已经收集成功

OK logstash 部署成功,我住长江头,君住长江尾,日日思君不见君,共饮长江水。。。

4.搭建kibana

部署机器192.168.210.42

tar -zxvf kibana-7.2.0-linux-x86_64.tar.gz

更改配置

  1. kibana-7.2.0-linux-x86_64/config/kibana.yml
  2. server.port: 5602
  3. server.host: "0.0.0.0"
  4. elasticsearch.hosts: ["http://192.168.210.40:9200"]
  5. # 设置语言
  6. i18n.locale: "zh-CN"

启动

nohup ./bin/kibana &

访问:http://192.168.210.42:5602/

OK 部署成功    具体操作后面在介绍    我只是敢和别人不一样

5.搭建FileBeat

  • fileBeat 为轻量级采集日志工具
tar -zxvf filebeat-7.2.0-linux-x86_64.tar.gz

介绍一个filebeat采集日志,发送到logstash进行日志分词处理,在推送到elasticsearch集群中收集,最后在kibana中进行生成日志报表,进行分析

  • filebeat 日志采集配置   和产生日志的机器上进行部署
  1. vim aap.yml
  2. filebeat.inputs:
  3. - type: log
  4. enabled: true
  5. paths:
  6. - /var/log/tdsp/*.log
  7. # tags: ["nginx"] # 添加自定义的tag
  8. setup.template.settings:
  9. index.number_of_shards: 3
  10. #output.console:
  11. # pretty: true
  12. # enable: true
  13. output.logstash:
  14. hosts: ["192.168.210.41:5044"]
  15. #filebeat.config.modules:
  16. # path: ${path.config}/modules.d/*.yml
  17. # reload.enabled: false
  18. #setup.kibana:
  19. # host: "192.168.210.42:5602"
  20. #output.elasticsearch:
  21. # hosts: ["192.168.210.40","192.168.210.44","192.168.210.45"]
  22. # 测试控制台输入输出
  23. #filebeat.inputs:
  24. #- type: stdin
  25. # enabled: true
  26. #setup.template.settings:
  27. # index.number_of_shards: 3
  28. #output.console:
  29. # pretty: true
  30. # enable: true
  • logstash 设置配置接受frlebeat采集的日志,并处理分词    192.168.210.41 机器
  1. nput {
  2. beats{
  3. port => "5044"
  4. }
  5. }
  6. filter {
  7. mutate {
  8. split => {"message" => "|"}
  9. }
  10. mutate {
  11. add_field => {
  12. "date" => "%{[message][0]}"
  13. "grade" => "%{[message][1]}"
  14. "infosource" => "%{[message][2]}"
  15. "msg" => "%{[message][3]}"
  16. "ip2long" => "%{[message][4]}"
  17. }
  18. }
  19. mutate {
  20. convert => {
  21. "date" => "string"
  22. "grade" => "string"
  23. "infosource" => "string"
  24. "msg" => "string"
  25. "ip2long" => "string"
  26. }
  27. }
  28. }
  29. output {
  30. elasticsearch {
  31. hosts => ["192.168.210.40:9200","192.168.210.44:9200","192.168.210.45:9200"]
  32. index => "kaikai-%{+YYYY-MM-dd}"
  33. }
  34. }
  35. #output {
  36. # stdout {codec => rubydebug}
  37. #}

 logstash 设置配置采集的日志,并处理分词    索引以文件名命名

 

  1. input {
  2. file {
  3. type => "info_log"
  4. path => "/opt/kaikai.log"
  5. discover_interval => 10 # 监听间隔
  6. start_position => "end"
  7. # sincedb_path => "/usr/share/logstash/sincedb_kaikai"
  8. #start_position => "beginning"
  9. }
  10. file {
  11. type => "error_log"
  12. path => "/opt/error.log"
  13. discover_interval => 10
  14. start_position => "beginning"
  15. }
  16. }
  17. filter {
  18. grok {
  19. match => { "[log][file][path]" => "/(?<logfilename>[^/]+)\.log$" }
  20. }
  21. if [type] == "info_log" {
  22. # 首先,分割消息字段
  23. mutate {
  24. split => { "message" => "|" }
  25. }
  26. # 使用 Ruby 代码处理不同的字段数量情况
  27. ruby {
  28. code => "
  29. message = event.get('message')
  30. path = event.get('path')
  31. if message.length == 5
  32. # 如果字段数量正好是5,直接分配
  33. event.set('date', message[0].strip)
  34. event.set('grade', message[1].strip)
  35. event.set('pid', message[2].strip)
  36. event.set('code_location', message[3].strip)
  37. event.set('msg', message[4].strip)
  38. elsif message.length > 5
  39. # 如果字段数量大于5,合并超出的字段到最后一个字段
  40. event.set('date', message[0].strip)
  41. event.set('grade', message[1].strip)
  42. event.set('pid', message[2].strip)
  43. event.set('code_location', message[3].strip)
  44. # 合并剩余的所有字段到最后一个字段,并去除每个字段前后的空格,然后用空格连接它们
  45. # event.set('msg', message[4..-1].map(&:strip).join('|'))
  46. event.set('msg', message[4..-1].join('|'))
  47. else
  48. # 如果字段数量不足,将整个消息视为格式不正确
  49. #event.set('format_error', message.join('|').strip)
  50. #event.tag('format_error')
  51. event.set('msg', message)
  52. end
  53. "
  54. }
  55. # 如果需要,处理格式错误的情况
  56. # 例如,可以选择将格式错误的事件发送到特定的文件或索引
  57. }
  58. else if [type] == "error_log" {
  59. }
  60. }
  61. output {
  62. if [type] == "info_log" {
  63. if "format_error" in [tags] {
  64. elasticsearch {
  65. hosts => ["192.168.142.106:9200"]
  66. index => "%{logfilename}-%{+YYYY-MM-dd}"
  67. }
  68. } else {
  69. elasticsearch {
  70. hosts => ["192.168.142.106:9200"]
  71. index => "%{logfilename}-%{+YYYY-MM-dd}"
  72. # index => "path-%{+YYYY-MM-dd}"
  73. }
  74. }
  75. stdout { codec => rubydebug }
  76. }
  77. else if [type] == "error_log" {
  78. elasticsearch {
  79. hosts => ["192.168.142.106:9200"]
  80. index => "%{filename}-%{+YYYY-MM-dd}"
  81. }
  82. }
  83. }

 

  • logstash启动 日志过滤
./bin/logstash -f app.yml
  • 启动filebeat 采集日志
./filebeat -e -c app.yml
  • 输入日志到/var/log/tdsp/
 echo '2019-11-18 13:53:35|ERROR|MQTT connected error|错1|2130708993' >> /var/log/tdsp/a.log

在集群中生成了filebeat所采集的日志,最后在kibana中进行日志索引,进行生成日志报表即可 

官方文档及下载Elasticsearch Platform — Find real-time answers at scale | Elastic

Elasticsearch集群详细介绍Elasticsearch集群详细介绍_elasticsearch集群图解-CSDN博客

Filebeat工作原理ELK ----- Filebeat工作原理_elk+filebeat工作原理-CSDN博客

日志采集工具MetricbeatELK 日志采集工具Metricbeat_elk收集不到metric-CSDN博客

Kibana介绍ELK---Kibana介绍_kibanan 15672-CSDN博客

Logstash介绍ELK ---Logstash介绍_@elklog注解是什么意思-CSDN博客

感谢两个徒弟waner,hang整理的文档人生没有白走的路,每一步都算数!!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/421594
推荐阅读
相关标签
  

闽ICP备14008679号