赞
踩
这里以下面需求为例,描述如何使用ELK做数据分析
示例需求:从线上down下来的nginx日志文件要求分析,并出图表,找到慢且访问量大的接口
前置要去:需要读者基本了解Elasticsearch和Kibana
首先需要安装Elasticsearch以及Logstash和Kibana。Elasticsearch相当于数据库,Kibana用于数据视图,Logstash主要作为转换、输入、过滤的一个工具。
软件的安装这里就不过多描述了。这里主要讲一下如何操作去导入nginx日志文件到ELK中分析。
首先,nginx日志文件的配置如下,nginx日志文件什么格式都可以,以下只是用作示范
log_format main '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" "$request_time" "$upstream_response_time"';
基本来说,这个nginx日志文件配置只比默认的多 r e q u e s t t i m e 和 request_time和 requesttime和upstream_response_time。那么logstash如何配置呢。
logstash的配置文件在config目录下xxx.config,基本格式如下,包含了输入、过滤、输出
input {
}
filter {
}
output {
}
输入就是数据的来源,这里直接填些文件的路径,输入配置如下,这里的path后面跟的就是文件地址,start_position表示从头读取文件,sincedb_path是logstash用于记录数据的一个偏移量,传输到哪里了,比如传输一个文件,传送完了,就会在sincedb的文件写入位置,下一次再运行就不会重复传数据。这里的sincedb_path我们直接自己新建了一个sincedb文件,然后配置指向责怪文件。
input {
file {
# 读取文件的路径, 基于 glob 匹配语法
path => "/down/nginx-2022-11-14.log"
# beginning/end 是否从头读取文件, 默认从尾部读取
start_position => "beginning"
sincedb_path => "/down/logstash-8.1.0-windows-x86_64/logstash-8.1.0/data/sincedb"
}
}
filter配置用于配置过滤,字段转化功能,是核心配置。这里我们使用自定义转换,使用grok表达式去把nginx的日志格式转化为Elasticsearch的数据。
grok表达式相当于一个正则,用于匹配原始数据的一行的格式,然后按表达式转换成key,value的一个数据。举个例子
//nginx 源日志
"GET /xxx/xxx?s=a HTTP/1.1" 200
//下面是对应的grok表达式
\"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code}
接下来我们来分析上面这个例子,首先nginx源日志"GET /xxx/xxx?s=a HTTP/1.1"
这里开头和结尾有个冒号,这里就需要使用\"
加个反斜杠来匹配,然后是GET,GET表示请求的method,后面有个空格,所以这个字段使用%{WORD:request_method}
来匹配,到时候输入到es里面的就是request_method这个字段对应的数据。GET后面带个空格,所以grop后面也要加个空格,然后后面的字符是/xxx/xxx?s=a,表示一个路径加参数,这里使用%{DATA:uri}\?%{DATA:param}
来匹配,中间的?号是需要加\转义的,把url切分成路径和参数。当然有些url是不带参数的,这里需要用%{DATA:uri}
来匹配,同理,后面的HTTP/1.1" 200
,就用 HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code}
来匹配了。
而对于上文说到的nginx格式和完整的grok表达式如下:
//nginx日志格式
log_format main '$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" "$request_time" "$upstream_response_time"';
//grok完整表达式
%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\" \"%{NUMBER:response_time:float}\"
一个一个字符去扣,基本就能明白如何使用grok表达式了。
grok表达式本身比较复杂,为了效率,debug功能就非常的必要了,可以节省很多排查配置的时间,grok的debug功能在kiabana有。首先在左边栏里点击这个dev tools,
然后在上边栏可以找到grok debugger,这里Sample Data为原始数据,Grok Pattern为待debug的grok表达式,填写好后,点击simulate,会在Structured Data处生成解析后的结果
我们以一条nginx数据为例,nginx数据如下
172.11.11.11 - - [08/Nov/2022:00:00:02 +0800] "GET /xxsdada/asdasd?a=123&b=123 HTTP/1.1" 200 1194 "https://xxxx.xxx.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36" "2804:30c:1854:d500:5926:3f37:456e:2176" "0.5" "0.5"
提交后如图所示,grok表达式把nginx的日志行数据转换成了结构化数据。这就代表grok表达式没有问题,如果出现错误的情况,直接调整grok表达式后重试
对于nginx日志,并不是每一行日志都有的,那么如何处理?
比如上面的日志
r
e
q
u
e
s
t
t
i
m
e
和
request_time和
requesttime和upstream_response_time,可能在日志里没有,这时候nginx输出的日志是 “-” “-”,nginx这里使用了横杠-来表示空值,对于我们的配置%{NUMBER:response_time:float},'-'是无法转换为NUMBER类型的,这里就会转换出错。这里就需要用到多个grok表达式了,logstash可以使用多个grok表达式来转换数据,默认是使用第一个,如果第一个过滤失败了,就会默认往后推。那我们只需要去除%{NUMBER:response_time:float}这段后生成第二个grok表达式,就能兼容上述的场景了。
对于单个表达式,配置方法如下
filter {
grok {
match => {
"message" => "xxxxxxxxxx//grok表达式"}
}
# 通过date插件,把nginx日志中的时间戳用作logstash的event时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
对于多个表达式,配置方法如下
filter {
grok {
match => {
"message" => ["xxxxxxxxxx//grok表达式1","xxxxxxxxxx//grok表达式2","xxxxxxxxxx//grok表达式3"}
}
# 通过date插件,把nginx日志中的时间戳用作logstash的event时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
配置如下,仅作示范,也有一些情况可能没有兼容
filter {
grok {
match => {
"message" =>
[
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\" \"%{NUMBER:response_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" ",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\" \"%{NUMBER:response_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" ",
"%{HTTPD_COMBINEDLOG}"
]}
}
# 通过date插件,把nginx日志中的时间戳用作logstash的event时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
因为这里的数据要存在es中,所以这里直接把output配置es的端口
output {
# 输出到 ES 中
elasticsearch {
hosts => ["127.0.0.1:9200","127.0.0.1:9300","127.0.0.1:9400"]
index => "nginx-log-test"
}
}
这里hosts后面就是es或者es集群的ip端口,index为要存入的索引(即数据库表名),这里无需手动创建索引,logstash会自动帮我们创建。
至此,logstash的配置就完成了,合在一起如下,存入logstash.conf文件中
input {
file {
# 读取文件的路径, 基于 glob 匹配语法
path => "/down/xxxxxx.log"
# beginning/end 是否从头读取文件, 默认从尾部读取
start_position => "beginning"
sincedb_path => "/down/logstash-8.1.0-windows-x86_64/logstash-8.1.0/data/sincedb"
}
}
filter {
grok {
match => {
"message" =>
[
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\" \"%{NUMBER:response_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri}\?%{DATA:param} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" ",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\" \"%{NUMBER:response_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" \"%{NUMBER:request_time:float}\"",
"%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time_local}\] \"%{WORD:request_method} %{DATA:uri} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent_bytes} \"%{DATA:http_referrer}\" \"%{DATA:http_user_agent}\" ",
"%{HTTPD_COMBINEDLOG}"
]}
}
# 通过date插件,把nginx日志中的时间戳用作logstash的event时间戳
date {
match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
# 输出到 ES 中, 自动按天创建索引
elasticsearch {
hosts => ["127.0.0.1:9200","127.0.0.1:9300","127.0.0.1:9400"]
index => "nginx-local"
}
}
这里以我的安装目录为例首先先运行检查程序
\down\logstash-8.1.0-windows-x86_64\logstash-8.1.0\bin\logstash --config.test_and_exit -f \down\logstash-8.1.0-windows-x86_64\logstash-8.1.0\config\logstash.conf
如果配置ok,那么输出会提示配置ok
然后执行下面命令启动logstash,对于导入静态文件这种一次性的数据导入,启动完成导入数据后可以关闭logstash,节约资源。
\down\logstash-8.1.0-windows-x86_64\logstash-8.1.0\bin\logstash -f \down\logstash-8.1.0-windows-x86_64\logstash-8.1.0\config\logstash.conf
上传完数据后,可以在Management-Data-Index Management上看见刚上传的索引信息。然后在在Management-Kibana-Data Views上Create data view,名字使用刚才配置的索引名nginx-local。配置完成后,在侧边栏的Analytics-Discover就可以看见刚才导入的数据行,可以对数据进行搜索等操作了。
侧边栏点击Analytics-Dashboard,然后点击Create dashboard,然后点击Create visualization,
然后左边数据视图就选刚才创建的nginx-local,时间可以选全部,然后图表类型按需要选择
水平轴Horizontal axis配置,这用uri作为水平轴Rank by先用默认的。然后点击X自动保存。
然后点击垂直轴Vertical axis,这里选用responseTime作纵轴,可以看到,这个聚合数据可以选择中位数,最大,最小,计数,和,最新值等聚合方式,我这里选择中位数,比较能体现。然后点X自动保存。
因为这里数据分析我们还想要知道一个路径具体的请求数是多少,所以我们再在纵轴加一个维度,这里直接统计数量,用count+record,然后轴选则放在右边
完成配置如下
可以看到url太长了没法完全显示,这时候我们可以调整图的类型
使用横向图并把请求数量的轴调到top,这样图就很直观了,然后找到请求比较多,响应又比较慢的接口就比较容易了。
最后右上角点击
然后右上角点击
然后输入名字,保存,下次分析就不用建图了,直接就能看见
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。