当前位置:   article > 正文

ELk + kafka + filebeat日志系统搭建_elk kafka filebeat

elk kafka filebeat

一 日志系统架构图:

简易版

标准版

logstash 和filebeat都具有日志收集功能,filebeat更轻量,占用资源更少,但logstash 具有filter功能,能过滤分析日志。一般结构都是filebeat采集日志,然后发送到消息队列,redis,kafaka。然后logstash去获取,利用filter功能过滤分析,然后存储到elasticsearch

二、elk安装:

可以参考网址:https://www.iyunv.com/thread-443374-1-1.html

2.1 配置es

解压文件

1
tar zxvf elasticsearch-6.2.4.tar.gz


由于es不能用root账户启动,所以需要添加一个非root账户

1
useradd es


修改es文件夹的权限

1
chown -R es:es elasticsearch-6.2.4


修改配置文件

1
vi /opt/elk/elasticsearch-6.2.4/config/elasticsearch.yml


修改elasticsearch.yml的内容如下

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  1. #端口
  2. http.port: 9200
  3. #ip
  4. network.host: 192.168.188.111
  5. #data路径
  6. path.data: /opt/elk/elasticsearch-6.2.4/data
  7. #logs路径
  8. path.logs: /opt/elk/elasticsearch-6.2.4/logs

创建data和logs文件夹

1
mkdir data logs


启动es(为了方便观察日志没有后台启动)

1
./bin/elasticsearch

安装过程遇到的问题

[1]: max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]

vim /etc/security/limits.conf

# 在最后面追加下面内容

  1. * soft nofile 65536
  2. * hard nofile 131072
  3. * soft nproc 2048
  4. * hard nproc 4096

 

 

[2]: max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

vi /etc/sysctl.conf

添加下面配置:

vm.max_map_count=262144

并执行命令:

 

sysctl -p

2.2 logstash 配置

解压文件

1
tar zxvf logstash-6.2.4.tar.gz


进入config文件夹新建log4j_es.conf文件并编写内容

1
vi log4j_es.conf


log4j_es.conf内容

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  1. input {
  2. # https://www.elastic.co/guide/en/logstash/current/plugins-inputs-log4j.html
  3. tcp {
  4. mode => "server"
  5. host => "192.168.188.110"
  6. port => 4560
  7. codec => json_lines
  8. }
  9. }
  10. output {
  11. elasticsearch{
  12. hosts => ["192.168.188.110:9200"]
  13. index => "log4j-%{+YYYY.MM.dd}"
  14. document_type => "log4j_type"
  15. }
  16. stdout { codec => rubydebug }
  17. }


启动logstash (为了方便观察日志没有后台启动)

1
./bin/logstash -f config/log4j_es.conf


2.3 kibana 配置

解压

1
tar zxvf kibana-6.2.4-linux-x86_64.tar.gz


修改配置文件kibana.yml

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  1. server.port: 5601
  2. # To allow connections from remote users, set this parameter to a non-loopback address.
  3. server.host: "192.168.188.110"
  4. # The URL of the Elasticsearch instance to use for all your queries.
  5. elasticsearch.url: "http://192.168.188.110:9200"


启动kibana

1
./bin/kibana

 

3.kibana界面设置

进入kibana界面
点击 Management 点击Index Patterns
在 Create index pattern 的文本框输入索引名称,因为我在logstash中设置索引为 log4j-%{+YYYY.MM.dd},所以我们填写 log4j-* 点击下一步设置直到完成。
点击Discovery可以看到我们的日志了。

三、kafka和zookeeper安装与配置

zookeeper的安装参考 :zookeeper集群搭建步骤

kafka的安装参考: Kafka集群搭建步骤

启动zk:

1
zkServer.sh start

 启动kafka

启动kafka

1
# bin/kafka-server-start.sh config/server.properties


创建topic

1
# bin/kafka-topics.sh --create --zookeeper 192.168.188.110:2181 --replication-factor 1 --partitions 1 --topic test


创建生产者

1
# bin/kafka-console-producer.sh --broker-list 192.168.188.110:9092 --topic test


创建消费者

1
bin/kafka-console-consumer.sh --zookeeper 192.168.188.110:2181 --topic test --from-beginning

 

四. 集成logstash与kafka

参考:
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
vi logstash_agent.conf

 

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  19. 19
  20. 20
  21. 21
  1. input {
  2. kafka {
  3. bootstrap_servers => "192.168.188.110:9092"
  4. #topic id
  5. topics => "test"
  6. codec => "json"
  7. }
  8. }
  9. filter {
  10. }
  11. output {
  12. elasticsearch{
  13. hosts => ["192.168.188.110:9200"]
  14. index => "kafka-%{+YYYY.MM.dd}"
  15. }
  16. stdout { codec => rubydebug }
  17. }

完整配置文件可参考:test.conf

启动elk之后,在kafka生产者输入一条信息之后logstash打印出对应的内容,说明集成成功了
。在kibana设置对应的日志。

4. 安装filebeat

解压

1
tar -zxvf filebeat-6.2.3-linux-x86_64.tar.gz

 

配置filebeat.yml

  1. 1
  2. 2
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 9
  10. 10
  11. 11
  12. 12
  13. 13
  14. 14
  15. 15
  16. 16
  17. 17
  18. 18
  1. filebeat.prospectors:
  2. # 这里面配置实际的日志路径
  3. - type: log
  4. enabled: true
  5. paths:
  6. - /var/log/test1.log
  7. fields:
  8. log_topics: test1
  9. - type: log
  10. enabled: true
  11. paths:
  12. - /var/log/test2.log
  13. fields:
  14. log_topics: test2
  15. output.kafka:
  16. enabled: true
  17. hosts: ["10.112.101.90:9092"]
  18. topic: '%{[fields][log_topics]}'

完整配置文件可参考:filebeat.yml

运行

1
./filebeat -e -c filebeat.yml

 

把logstash停止,修改配置文件,添加filebeat配置的topic。重启logstash。打开kibana刷新页面,看到filebeat收集到的日志

清除es index及logstash日志相关脚本(仅供参考)如下:

#/bin/bash
# clean elastic history index data
# author lujie
# createTime 2018-06-09 12:00:12

CLEAR_LOGSTASH_TIME=`date +%Y-%m-%d -d "7 days ago"`
#delete logstash logs
DELETE_LOGS=/data/elk/logstash-6.2.4/logs/logstash-plain-$CLEAR_LOGSTASH_TIME*
if [ ! -n "$DELETE_LOGS" ]; then 
    echo -e '\n get logstash path is null,check it!' 
else
    echo -e '\n delete logstash logs start...'
    echo -e '\n execute common is:  rm -rf '$DELETE_LOGS 
    rm -rf $DELETE_LOGS:
    echo -e '\n delete logstash logs fininshed!'
fi    

echo -e '\n delete elasticsearch index start...'
#define clear_time current is 7 days ago
CLEAR_TIME=`date +%Y.%m.%d -d "7 days ago"`
#curl delete request formatter is  xxx-YYYY.MM.dd
curl -XDELETE 'http://192.168.0.45:9200/*-'${CLEAR_TIME}
echo -e '\n delete elasticsearch index fininshed!'
 

注意:
在logstash中,当input里面有多个kafka输入源时,client_id => "xxx"必须添加且需要不同,否则会报错javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0。

  1. input {
  2. kafka {
  3. group_id => "es1"
  4. }
  5. kafka {
  6. group_id => "es2"
  7. }
  8. }

 

 

vim /etc/security/limits.conf

 

# 在最后面追加下面内容

*** hard nofile 65536

*** soft nofile 65536

五 ELK相关配置文件

  1. 定时清理日志脚本位置:
    /data/elk/elk_logs_clean.sh
    执行脚本在crontab里面配置:
    /etc/crontab,如果定时任务不执行,可以查看crontab执行情况 tail -f /var/log/cron

    # Example of job definition:

    # .---------------- minute (0 - 59)

    # |  .------------- hour (0 - 23)

    # |  |  .---------- day of month (1 - 31)

    # |  |  |  .------- month (1 - 12) OR jan,feb,mar,apr ...

    # |  |  |  |  .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat

    # |  |  |  |  |

    # *  *  *  *  * user-name  command to be executed

      0  0  *  *  * root /data/elk/elk_logs_clean.sh

  2. filebeat配置: 

    /data/elk_agent/filebeat-6.2.4-linux-x86_64/filebeat.yml

    # 指定日志类型,可以配置多个

    - type: log

      # Change to true to enable this prospector configuration.

      enabled: true

      #指定日志文件路径,注意这个配置不能递归日志目录

      paths:

        - /data/s2b/ares/log/*.out

      #指定kafka的topic

      fields:

        log_topics: ares_test

      ##多行日志合并成一行,这里讲日期、Hibernate、ES开头的设置为一行

      multiline:

        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}|^Hibernate|^ES'

        negate: true

        match: after

     

    - type: log

      enabled: true

      paths:

        - /data/s2b/boss/log/*.out

      fields:

        log_topics: boss_test

      multiline:

        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}|^Hibernate|^ES'

        negate: true

        match: after

    - type: log

     

      # Change to true to enable this prospector configuration.

      enabled: true

     

      # Paths that should be crawled and fetched. Glob based paths.

      paths:

        - /data/s2b/mobile/log/*.out

      fields:

        log_topics: mobile_test

      multiline:

        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}|^Hibernate|^ES'

        negate: true

        match: after  

    - type: log

     

      # Change to true to enable this prospector configuration.

      enabled: true

     

      # Paths that should be crawled and fetched. Glob based paths.

      paths:

        - /data/s2b/site/log/*.out

      fields:

        log_topics: site_test

      # Exclude lines. A list of regular expressions to match. It drops the lines that are

      # matching any regular expression from the list.

      #exclude_lines: ['^DBG']

      ##用时间为开头或者堆栈信息合并到一行

      multiline:

        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}|^Hibernate|^ES'

        negate: true

        match: after

    - type: log

      # Change to true to enable this prospector configuration.

      enabled: true

      # Paths that should be crawled and fetched. Glob based paths.

      paths:

        - /data/s2b/supplier/log/*.out

      fields:

        log_topics: supplier_test

      multiline:

        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}|^Hibernate|^ES'

        negate: true

        match: after          

    #================================ General =====================================

    name: 3.0QA

    #输出到kafka

    output.kafka:

      enabled: true

      hosts: ["101.124.4.158:9092"]

      topic: '%{[fields][log_topics]}'

    详细配置可以参考:https://www.elastic.co/guide/en/beats/filebeat/5.5/index.html

  3.  logstash配置:/data/elk/logstash-6.2.4/config/test.conf

    input {

      #可以配置多个kafka

      kafka {

        bootstrap_servers => "192.168.0.45:9092"

        #topic ids可以配置kafka topic数组

        topics => "yinlian"

        consumer_threads => 1

        decorate_events => true

        codec => json {

                charset => "UTF-8"

            }

        auto_offset_reset => "latest"

        #指定type,在output处用到

        type => "yinlian"

      }

      kafka {

        bootstrap_servers => "192.168.0.45:9092"

        #topic ids可以配置kafka topic数组

        topics => ["boss_test","mobile_test","site_test","supplier_test"]

        consumer_threads => 1

        decorate_events => true

       codec => json {

                charset => "UTF-8"

            }

        auto_offset_reset => "latest"

        type => "3.0test"

      }

    }

    #过滤器,暂时没有用到这个配置

    filter {

        

    }

    output {

      if[type] == "yinlian" {

          elasticsearch{

            hosts => ["192.168.0.45:9200"]

            index => "yinlian-%{+YYYY.MM.dd}"

          }

      }

      if[type] == "3.0test"{

         elasticsearch{

            hosts => ["192.168.0.45:9200"]

            index => "3.0test-%{+YYYY.MM.dd}"

          }

      }

       stdout { codec => rubydebug }

    }

    虽让上面的配置能让系统运行起来,我们能筛选内容,但是索引里面有好多我们用不到的字段,接下来我们对索引字段处理

  4. 处理ES索引字段
    在进行下面的配置之前我们需要先熟悉下logstash的mutate和grok语法
    mutate语法可以参考:
    https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/filter/mutate.html

  5. 在线调试grok表达式地址如下:
    http://grokdebug.herokuapp.com/
    grok内置的正则可以参考:
    https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
    下面一个简单的正则就可以找到我们想要的数据

  6. 配置好我们需要的字段之后,重启logstash在kibana观察索引字段的变化。

 

六 遇到的问题

 

  1.  logstash时差八小时问题:

    修改logstash的配置文件,在filter内修改timestamp标签

    如下使用自定义的timestamp替换内置的:

    date {
    match => ["message","UNIX_MS"]
    target => "@timestamp"
    }
    ruby {
    code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    }
    ruby {
    code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
    remove_field => ["timestamp"]
    }

  2. logstash解析java日志报json错误

    原计划将kafka输出的message解析成json格式,在logstash中的filter添加如下配置

    json {
     source => "message"
     target => "message"
     }

    在运行的时候一直报json解析失败。从kafka拿到具体的topic数据发现,kafka发送给logstash的是json格式massage只是具体日志内容,注释这个配置即可。目前filter默认没有配置任何东西
     

  3. filebeat输出java的错误日志不完整:
    特别是打印java堆栈数据的时候每条日志只能打印一点不能完整展示。在官方文档找到可以采取配置多行追加的模式  

 

      - type: log

         # Change to true to enable this prospector configuration.
        enabled: true

        # Paths that should be crawled and fetched. Glob based paths.
        paths:
              - D:\htdocs\logs\*.log
        fields:
            log_topics: testlocal
        multiline:

              ##将日志以时间开头的作为一行(用于java日志多行合并)
            pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
           negate: true
           match: after 

4. filebeat读取文件,怎么判断上次读取的位置?

在filebeat的data牡蛎有个文件是 registy,用文本编辑器打开可以看到里面记录了,读取文件的相关信息如下,如果想重新读取文件停止filebeat后删除该文件即可:

[{"source":"/data/Tomcat/logs/catalina.out","offset":1048680657,"timestamp":"2018-06-06T11:57:18.215705238+08:00","ttl":-2,"type":"log","FileStateOS":{"inode":100671962,"device":64769}},
{"source":"/htdocs/logs/kstore_haoxianggou_site.log","offset":0,"timestamp":"2018-06-06T12:44:29.718548093+08:00","ttl":-1,"type":"log","FileStateOS":{"inode":88086231,"device":64769}},
{"source":"/htdocs/logs/kstore_haoxianggou_site_error.log","offset":0,"timestamp":"2018-06-06T12:44:29.721257882+08:00","ttl":-1,"type":"log","FileStateOS":{"inode":88086232,"device":64769}}
]

七 logstash和其他组件分析

1.Logstash

优势:

拥有很多的插件,使用起来很灵活。网上相应的资源比较丰富,以下例子可以参考下:

5 minute intro

reindexing data in Elasticsearch

parsing Elasticsearch logs

rewriting Elasticsearch slowlogs so you can replay them with JMeter

劣势:

1.Logstash 致命的问题是它的性能以及资源消耗(默认的堆大小是 1GB)。尽管它的性能在近几年已经有很大提升,与它的替代者们相比还是要慢很多的。

这里有 Logstash 与 rsyslog 性能对比以及Logstash 与 filebeat 的性能对比。它在大数据量的情况下会是个问题。

2. 目前不支持缓存,目前的典型替代方案是将 Redis 或 Kafka 作为中心缓冲池:

 

应用场景:

因为 Logstash 自身的灵活性以及网络上丰富的资料,Logstash 适用于原型验证阶段使用,或者解析非常的复杂的时候。

在不考虑服务器资源的情况下,如果服务器的性能足够好,我们也可以为每台服务器安装 Logstash 。我们也不需要使用缓冲,因为文件自身就有缓冲的行为,而 Logstash 也会记住上次处理的位置。

如果服务器性能较差,并不推荐为每个服务器安装 Logstash ,这样就需要一个轻量的日志传输工具,将数据从服务器端经由一个或多个 Logstash 中心服务器传输到 Elasticsearch:

随着日志项目的推进,可能会因为性能或代价的问题,需要调整日志传输的方式(log shipper)。当判断 Logstash 的性能是否足够好时,重要的是对吞吐量的需求有着准确的估计,这也决定了需要为 Logstash 投入多少硬件资源。

2.Filebeat:

Filebeat 是一个轻量级的日志传输工具,它的存在正弥补了 Logstash 的缺点:Filebeat 作为一个轻量级的日志传输工具可以将日志推送到中心 Logstash。

在版本 5.x 中,Elasticsearch 具有解析的能力(像 Logstash 过滤器)— Ingest。这也就意味着可以将数据直接用 Filebeat 推送到 Elasticsearch,并让 Elasticsearch 既做解析的事情,又做存储的事情。也不需要使用缓冲,因为 Filebeat 也会和 Logstash 一样记住上次读取的偏移:

如果需要缓冲(例如,不希望将日志服务器的文件系统填满),可以使用 Redis/Kafka,因为 Filebeat 可以与它们进行通信:

 

 

优势:

Filebeat 只是一个二进制文件没有任何依赖,使用go语言开发。它占用资源极少。尽管它还十分年轻,正式因为它简单,所以几乎没有什么可以出错的地方,所以它的可靠性还是很高的。

它也为我们提供了很多可以调节的点,例如:它以何种方式搜索新的文件,以及当文件有一段时间没有发生变化时,何时选择关闭文件句柄。

劣势:

Filebeat 的应用范围十分有限,所以在某些场景下我们会碰到问题。例如,如果使用 Logstash 作为下游管道,我们同样会遇到性能问题。正因为如此,Filebeat 的范围在扩大。开始时,它只能将日志发送到 Logstash 和 Elasticsearch,而现在它可以将日志发送给 Kafka 和 Redis,在 5.x 版本中,它还具备过滤的能力。

典型应用场景

Filebeat 在解决某些特定的问题时:日志存于文件,我们希望

将日志直接传输存储到 Elasticsearch。这仅在我们只是抓去(grep)它们或者日志是存于 JSON 格式(Filebeat 可以解析 JSON)。或者如果打算使用 Elasticsearch 的 Ingest 功能对日志进行解析和丰富。

将日志发送到 Kafka/Redis。所以另外一个传输工具(例如,Logstash 或自定义的 Kafka 消费者)可以进一步丰富和转发。这里假设选择的下游传输工具能够满足我们对功能和性能的要求。

3.Fluentd:

也有很多公司的日志系统使用的是EFK,可以到网上查询相应的资料。

https://www.digitalocean.com/community/tutorials/elasticsearch-fluentd-and-kibana-open-source-log-search-and-visualization

Fluentd 创建的初衷主要是尽可能的使用 JSON 作为日志输出,所以传输工具及其下游的传输线不需要猜测子字符串里面各个字段的类型。这样,它为几乎所有的语言都提供库,这也意味着,我们可以将它插入到我们自定义的程序中。

优势:

和多数 Logstash 插件一样,Fluentd 插件是用 Ruby 语言开发的非常易于编写维护。所以它数量很多,几乎所有的源和目标存储都有插件(各个插件的成熟度也不太一样)。这也意味这我们可以用 Fluentd 来串联所有的东西。

劣势:

因为在多数应用场景下,我们会通过 Fluentd 得到结构化的数据,它的灵活性并不好。但是我们仍然可以通过正则表达式,来解析非结构化的数据。尽管,性能在大多数场景下都很好,但它并不是最好的,和 syslog-ng 一样,它的缓冲只存在与输出端,单线程核心以及 Ruby GIL 实现的插件意味着它大的节点下性能是受限的,不过,它的资源消耗在大多数场景下是可以接受的。对于小的或者嵌入式的设备,可能需要看看 Fluent Bit,它和 Fluentd 的关系与 Filebeat 和 Logstash 之间的关系类似

典型应用场景

Fluentd 在日志的数据源和目标存储各种各样时非常合适,因为它有很多插件。而且,如果大多数数据源都是自定义的应用,所以可以发现用 fluentd 的库要比将日志库与其他传输工具结合起来要容易很多。特别是在我们的应用是多种语言编写的时候,即我们使用了多种日志库,日志的行为也不太一样。

 

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/923652
推荐阅读
相关标签
  

闽ICP备14008679号