赞
踩
本文将从以下几点讲解的分布式日志系统
1.日志埋点
2.日志收集
3.日志处理分析
4.日志查询展示
先看一下日志数据流程图:
flume监听日志文件收集每行日志发到kafka,logstash消费kafka中的消息将日志解析成json插入到es,es提供日志查询
由于我们项目结构是网关+dubbo服务实例的分布式系统,埋点主要在http的网关和rpc的服务实例。
网关我们在BaseActionServlet类中记录了请求和响应日志。在一个请求进来的时候我们创建一个上下文对象DistributedContext保存当前线程的logId(唯一)及一些业务相关参数,在rpc调用的时候将这个上下文放在dubbo的RpcContext中在整个调用链中传递,使整个调用链能够通过logId串联。
ConsumerSetContextFilter
1 2 3 4 5 6 7 8 9 10 |
|
ProviderSetContextFilter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
日志收集我们用的是apach的flume: http://flume.apache.org/FlumeUserGuide.html , 它的source+channel+sink的设计非常适合日志收集
flume数据模型
完全与业务系统解耦,flume宕机不影响系统运行。
我们在每台宿主机中装上flume,配置好agent的source channel 和 sink:
a1.sources = r1 r2 r3 a1.sinks = k1 a1.channels = c1 #source配置监听文件 a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /data/xx/flume_taildir_position/gateway.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /data/xx/gateway/gateway.log a1.sources.r2.type = TAILDIR a1.sources.r2.positionFile = /data/xx/flume_taildir_position/demo.json a1.sources.r2.filegroups = f2 a1.sources.r2.filegroups.f2 = /data/xx/demo/demo.log a1.sources.r3.type = TAILDIR a1.sources.r3.positionFile = /data/xx/flume_taildir_position/authority.json a1.sources.r3.filegroups = f3 a1.sources.r3.filegroups.f3 = /data/xx/authority/authority.log #channel配置 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /data/soft/apache-flume-1.7.0-bin/channel_dir/checkpoint a1.channels.c1.dataDirs = /data/soft/apache-flume-1.7.0-bin/channel_dir/data #sink配置,将数据丢到kafka a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.brokerList=10.200.xxx.xxx:9092 a1.sinks.k1.topic=dcv6_log a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder #source与channel绑定 a1.sources.r1.channels = c1 a1.sources.r2.channels = c1 a1.sources.r3.channels = c1 #channel与sink绑定 a1.sinks.k1.channel = c1
flume的TailDir收集方式支持断点续传,他会将监听的文件inode值与行数持久化在文件中,重启后也可以从上一次监听的位置开始收集。
如果kafka宕机,flume也会将数据持久化到channel配置的file中,这样可以很好的保证数据不会轻易丢失。
logstash可以将日志格式化为json,方便我们处理日志的存取。因此用logstash消费kafka中的数据保存到es中。
如果logstash消费速度小于日志生成的速度,只需再起一个logstash进程去消费即可,这个拓展性要好一点。
logstash配置如下:
input { kafka { zk_connect => "10.200.100.xx1:2181,10.200.100.xx2:2181,10.200.100.xx3:2181" group_id => "g1" topic_id => "dcv6_log" fetch_message_max_bytes => 20480000 reset_beginning => false # boolean (optional), default: false consumer_threads => 20 # number (optional), default: 1 decorate_events => true # boolean (optional), default: false type => "ORTHER" codec => plain } } filter{ grok { match => {"message" => "\[(?<createTime>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\] \[%{DATA:level}\] \[%{DATA:className}\] \[%{DATA:methodName}\] \[%{DATA:thread}\] \[%{GREEDYDATA:msg}\] \[%{NUMBER:clientType:int}\] \[%{NUMBER:step:int}\] \[%{NUMBER:flag:int}\] \[%{DATA:ip}\] \[%{DATA:clientIp}\] \[%{NUMBER:costTime:int}\] \[%{DATA:isErr}\] \[%{DATA:errName}\] \[%{DATA:logId}\] \[%{DATA:sysName}\] \[%{DATA:actionName}\] \[%{DATA:apiName}\]"} remove_field => ["message"] #分析完之后,原始数据就可以被删掉了。原始数据占用的字段叫做message。 remove_field => "kafka" } if [actionName] == "xx.log.getReqLog" { drop {} } if [actionName] == "xx.log.getBusinessLog" { drop {} } if[flag]==1{ mutate { replace => { type => "REQUEST" } } } else if[flag]==2{ mutate { replace => { type => "RESPONSE" } } } else if[flag]==3{ mutate { replace => { type => "EXCEPTION" } } } else if[flag]==4{ mutate { replace => { type => "BUSINESS" } } } else if[flag]==5{ mutate { replace => { type => "INNER_API" } } } else { mutate { replace => { type => "OTHER" } } } date { match => [ "createTime", "yyyy-MM-dd HH:mm:ss.SSS" ] locale => "cn" timezone=>"+00:00" } } output { elasticsearch { hosts => ["10.200.100.xx6:9200"] index=>"dcv6-%{+YYYY.MM.dd}" } #stdout { codec => rubydebug } }
这个利用es的java api即可编写日志查询接口。
目前该分布式日志系统中平均每天2亿条日志(100+G)的数据量,高峰期日志消费速率会延迟10分钟左右。
es的查询较慢,可以考虑做下集群。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。