当前位置:   article > 正文

基于flume+kafka+logstash+es的分布式日志系统_flume kafka es

flume kafka es

本文将从以下几点讲解的分布式日志系统

1.日志埋点

2.日志收集

3.日志处理分析

4.日志查询展示

 

先看一下日志数据流程图:

flume监听日志文件收集每行日志发到kafka,logstash消费kafka中的消息将日志解析成json插入到es,es提供日志查询

1.日志埋点

由于我们项目结构是网关+dubbo服务实例的分布式系统,埋点主要在http的网关和rpc的服务实例。

网关我们在BaseActionServlet类中记录了请求和响应日志。在一个请求进来的时候我们创建一个上下文对象DistributedContext保存当前线程的logId(唯一)及一些业务相关参数,在rpc调用的时候将这个上下文放在dubbo的RpcContext中在整个调用链中传递,使整个调用链能够通过logId串联。

ConsumerSetContextFilter

1

2

3

4

5

6

7

8

9

10

@Activate(group = Constants.CONSUMER)

public class ConsumerSetContextFilter implements Filter {

    @Override

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        // 消费者将上下文对象放在RpcContext中

        RpcContext.getContext().setAttachment("context", ObjectUtil.toString(DistributedContext.getContext()));

        Result result = invoker.invoke(invocation);

        return result;

    }

}

ProviderSetContextFilter

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@Activate(group = Constants.PROVIDER)

public class ProviderSetContextFilter implements Filter {

    @Override

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        DistributedContext context = null;

        // 提供者从RpcContext中取出上下文对象

        String contextJson = RpcContext.getContext().getAttachment("context");

        if (StringUtil.isNullOrBlank(contextJson)) {

            DistributedContext.start();

            context = DistributedContext.getContext();

        } else {

            context = GsonUtil.fromJson(contextJson, DistributedContext.class);

        }

        // 将上下文对象放在当前线程ThreadLocal中

        DistributedContext.setContext((DistributedContext)context);

        Result result = invoker.invoke(invocation);

        return result;

    }

}

 

 

2.日志收集

日志收集我们用的是apach的flume: http://flume.apache.org/FlumeUserGuide.html ,  它的source+channel+sink的设计非常适合日志收集

Agent component diagram

                                                            flume数据模型

完全与业务系统解耦,flume宕机不影响系统运行。

我们在每台宿主机中装上flume,配置好agent的source channel 和 sink:

a1.sources = r1 r2 r3 
a1.sinks = k1
a1.channels = c1
#source配置监听文件
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /data/xx/flume_taildir_position/gateway.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /data/xx/gateway/gateway.log
a1.sources.r2.type = TAILDIR
a1.sources.r2.positionFile = /data/xx/flume_taildir_position/demo.json
a1.sources.r2.filegroups = f2
a1.sources.r2.filegroups.f2 = /data/xx/demo/demo.log
a1.sources.r3.type = TAILDIR
a1.sources.r3.positionFile = /data/xx/flume_taildir_position/authority.json
a1.sources.r3.filegroups = f3
a1.sources.r3.filegroups.f3 = /data/xx/authority/authority.log
#channel配置
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.7.0-bin/channel_dir/checkpoint
a1.channels.c1.dataDirs = /data/soft/apache-flume-1.7.0-bin/channel_dir/data
#sink配置,将数据丢到kafka
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList=10.200.xxx.xxx:9092
a1.sinks.k1.topic=dcv6_log
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
#source与channel绑定
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
#channel与sink绑定
a1.sinks.k1.channel = c1

flume的TailDir收集方式支持断点续传,他会将监听的文件inode值与行数持久化在文件中,重启后也可以从上一次监听的位置开始收集。

如果kafka宕机,flume也会将数据持久化到channel配置的file中,这样可以很好的保证数据不会轻易丢失。

 

 

3.日志处理分析

logstash可以将日志格式化为json,方便我们处理日志的存取。因此用logstash消费kafka中的数据保存到es中。

如果logstash消费速度小于日志生成的速度,只需再起一个logstash进程去消费即可,这个拓展性要好一点。

logstash配置如下:

input {
        kafka {
                zk_connect => "10.200.100.xx1:2181,10.200.100.xx2:2181,10.200.100.xx3:2181"
                group_id => "g1" 
                topic_id => "dcv6_log"
                fetch_message_max_bytes => 20480000
                reset_beginning => false # boolean (optional), default: false  
                consumer_threads => 20  # number (optional), default: 1  
                decorate_events => true # boolean (optional), default: false  
                type => "ORTHER"
                codec => plain
        }
}
filter{
        grok {
                match => {"message" => "\[(?<createTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\] \[%{DATA:level}\] \[%{DATA:className}\] \[%{DATA:methodName}\] \[%{DATA:thread}\] \[%{GREEDYDATA:msg}\] \[%{NUMBER:clientType:int}\] \[%{NUMBER:step:int}\] \[%{NUMBER:flag:int}\] \[%{DATA:ip}\] \[%{DATA:clientIp}\] \[%{NUMBER:costTime:int}\] \[%{DATA:isErr}\] \[%{DATA:errName}\] \[%{DATA:logId}\] \[%{DATA:sysName}\] \[%{DATA:actionName}\] \[%{DATA:apiName}\]"}
                remove_field => ["message"]   #分析完之后,原始数据就可以被删掉了。原始数据占用的字段叫做message。
                remove_field => "kafka"
        }
        if [actionName] == "xx.log.getReqLog" {  
                drop {}  
                }  
        if [actionName] == "xx.log.getBusinessLog" {  
                        drop {}  
                }  
        if[flag]==1{
                mutate { replace => { type => "REQUEST" } }
        } else if[flag]==2{
                mutate { replace => { type => "RESPONSE" } }
        } else if[flag]==3{
                mutate { replace => { type => "EXCEPTION" } }
        } else if[flag]==4{
                mutate { replace => { type => "BUSINESS" } }
        } else if[flag]==5{
                mutate { replace => { type => "INNER_API" } }
        } else {
                mutate { replace => { type => "OTHER" } }
        }
        date {
                match => [ "createTime", "yyyy-MM-dd HH:mm:ss.SSS" ]
                locale => "cn"
                timezone=>"+00:00"
        }
}
output {
        elasticsearch { hosts => ["10.200.100.xx6:9200"]
                index=>"dcv6-%{+YYYY.MM.dd}" 
        } 
        #stdout { codec => rubydebug }
}

 

 

4.日志查询展示

这个利用es的java api即可编写日志查询接口。

 

 

目前该分布式日志系统中平均每天2亿条日志(100+G)的数据量,高峰期日志消费速率会延迟10分钟左右。

es的查询较慢,可以考虑做下集群。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/975811
推荐阅读
相关标签
  

闽ICP备14008679号