赞
踩
# logstash日志配置 grok 官网 部分链接出处 注:本文链接了部分官方文档,其他博主等文章
一般为debug 方式,检测 ELK 集群是否健康,这种方法在 logstash 启动后可以直接手动数据数据,并将格式化后的数据打印出来。
1、启动logstash
2、logstash启动后,直接进行数据输入
3、logstash处理后,直接进行返回
input {
stdin {}
}
output {
stdout {
codec => rubydebug
}
}
1、启动logstash
2、启动后直接在终端输入数据
3、数据会由logstash处理后返回并存储到es集群中
input {
stdin {}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"]
index => 'logstash-debug-%{+YYYY-MM-dd}'
}
}
1、由tcp 的8888端口将日志发送到logstash
2、数据被grok进行正则匹配处理
3、处理后,数据将被打印到终端并存储到es
input { tcp { port => 8888 } } filter { grok { match => {"message" => "%{DATA:key} %{NUMBER:value:int}"} } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["10.3.145.14","10.3.145.56","10.3.145.57"] index => 'logstash-debug-%{+YYYY-MM-dd}' } } # yum install -y nc # free -m |awk 'NF==2{print $1,$3}' |nc logstash_ip 8888
1、由tcp 的8888端口将日志发送到logstash
2、数据被grok进行正则匹配处理
3、处理后,数据将被打印到终端
input { tcp { port => 8888 } } filter { grok { match => {"message" => "%{WORD:username}\:%{WORD:passwd}\:%{INT:uid}\:%{INT:gid}\:%{DATA:describe}\:%{DATA:home}\:%{GREEDYDATA:shell}"} } } output { stdout { codec => rubydebug } } # cat /etc/passwd | nc logstash_ip 8888
1、在filebeat配置文件中,指定kafka集群ip [output.kafka] 的指定topic当中
2、在logstash配置文件中,input区域内指定kafka接口,并指定集群ip和相应topic
3、logstash 配置filter 对数据进行清洗
4、将数据通过 output 存储到es指定index当中
5、kibana 添加es 索引,展示数据
input { kafka { type => "audit_log" codec => "json" topics => "haha" #decorate_events => true #enable_auto_commit => true auto_offset_reset => "earliest" bootstrap_servers => ["192.168.52.129:9092,192.168.52.130:9092,192.168.52.131:9092"] } } filter { grok { match => { "message" => "%{COMBINEDAPACHELOG} %{QS:x_forwarded_for}"} } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } geoip { source => "lan_ip" } } output { if [type] == "audit_log" { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.52.129","192.168.52.130","192.168.52.131"] index => 'tt-%{+YYYY-MM-dd}' } } } #filebeat 配置 filebeat.prospectors: - input_type: log paths: - /opt/logs/server/nginx.log json.keys_under_root: true json.add_error_key: true json.message_key: log output.kafka: hosts: ["10.3.145.41:9092","10.3.145.42:9092","10.3.145.43:9092"] topic: 'nginx' # nginx 配置 log_format main '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sents":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}'; access_log /var/log/nginx/access.log main;
1、直接将本地的日志数据拉去到logstash当中
2、将日志进行处理后存储到es
input { file { type => "nginx-log" path => "/var/log/nginx/error.log" start_position => "beginning" } } filter { grok { match => { "message" => '%{DATESTAMP:date} [%{WORD:level}] %{DATA:msg} client: %{IPV4:cip},%{DATA}"%{DATA:url}"%{DATA}"%{IPV4:host}"'} } date { match => [ "timestamp" , "dd/MMM/YYYY:HH:mm:ss Z" ] } } output { if [type] == "nginx-log" { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'logstash-audit_log-%{+YYYY-MM-dd}' } } }
input { beats { port => 5000 } } filter { grok { match => {"message" => "%{IPV4:cip}"} } } output { elasticsearch { hosts => ["192.168.249.139:9200","192.168.249.149:9200","192.168.249.159:9200"] index => 'test-%{+YYYY-MM-dd}' } stdout { codec => rubydebug } } filebeat.inputs: - type: log enabled: true paths: - /var/log/nginx/access.log output.logstash: hosts: ["192.168.52.134:5000"]
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
output.kafka:
hosts: ["192.168.52.129:9092","192.168.52.130:9092","192.168.52.131:9092"]
topic: haha
partition.round_robin:
reachable_only: true
required_acks: 1
logstash 具有 input filter output 3个配置区域 input 负责管理输入到logstash的数据 filter 负责进行过滤、处理 output 负责将数据发出
USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} INT (?:[+-]?(?:[0-9]+)) BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) NUMBER (?:%{BASE10NUM}) BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)) BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b POSINT \b(?:[1-9][0-9]*)\b NONNEGINT \b(?:[0-9]+)\b WORD \b\w+\b NOTSPACE \S+ SPACE \s* DATA .*? GREEDYDATA .* QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``)) UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12} # Networking MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}) CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4}) WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2}) COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2}) IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)? IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9]) IP (?:%{IPV6}|%{IPV4}) HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b) HOST %{HOSTNAME} IPORHOST (?:%{HOSTNAME}|%{IP}) HOSTPORT %{IPORHOST}:%{POSINT} # paths PATH (?:%{UNIXPATH}|%{WINPATH}) UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+ TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+)) WINPATH (?>[A-Za-z]+:|\\)(?:\[^\\?*]*)+ URIPROTO [A-Za-z]+(\+[A-Za-z+]+)? URIHOST %{IPORHOST}(?::%{POSINT:port})? # uripath comes loosely from RFC1738, but mostly from what Firefox # doesn't turn into %XX URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+ #URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)? URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-[]]* URIPATHPARAM %{URIPATH}(?:%{URIPARAM})? URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})? # Months: January, Feb, 3, 03, 12, December MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b MONTHNUM (?:0?[1-9]|1[0-2]) MONTHNUM2 (?:0[1-9]|1[0-2]) MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9]) # Days: Monday, Tue, Thu, etc... DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?) # Years? YEAR (?>\d\d){1,2} HOUR (?:2[0123]|[01]?[0-9]) MINUTE (?:[0-5][0-9]) # '60' is a leap second in most time standards and thus is valid. SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?) TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9]) # datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it) DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR} DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR} ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE})) ISO8601_SECOND (?:%{SECOND}|60) TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}? DATE %{DATE_US}|%{DATE_EU} DATESTAMP %{DATE}[- ]%{TIME} TZ (?:[PMCE][SD]T|UTC) DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ} DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE} DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR} DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND} # Syslog Dates: Month Day HH:MM:SS SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME} PROG (?:[\w._/%-]+) SYSLOGPROG %{PROG:program}(?:[%{POSINT:pid}])? SYSLOGHOST %{IPORHOST} SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}> HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT} # Shortcuts QS %{QUOTEDSTRING} # Log formats SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}: COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent} # Log Levels LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
USERNAME [a-zA-Z0-9._-]+:匹配用户名,可以包含字母、数字、点、下划线和短划线。
示例:alice_123
john.doe
user-name
USER %{USERNAME}:使用USERNAME表达式的一个示例。
示例:user123
jdoe
some_user
INT (?:[+-]?(?:[0-9]+)):匹配整数,可以包含正负号。
示例:123
-456
789
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:.[0-9]+)?)|(?:.[0-9]+))):匹配十进制数,包括小数。
示例:123
-456.789
NUMBER (?:%{BASE10NUM}):使用BASE10NUM表达式的一个示例。
示例:123
-456.789
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)):匹配十六进制数,可以包含正负号和0x前缀。
示例:0x1A
-0x2F
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:.[0-9A-Fa-f]*)?)|(?:.[0-9A-Fa-f]+)))\b:匹配十六进制浮点数,可以包含正负号、0x前缀和小数部分。
示例:0x1A
-0x2F.5
POSINT \b(?:[1-9][0-9]*)\b:匹配正整数。
示例:123
456
NONNEGINT \b(?:[0-9]+)\b:匹配非负整数。
示例:0
42
WORD \b\w+\b:匹配单词字符。
示例:hello
world123
NOTSPACE \S+:匹配非空白字符。
示例:ThisIsSomeText
SPACE \s*:匹配空白字符。
示例:(空白)
DATA .*?:匹配任意字符(非贪婪模式)。
示例:Some data here.
*GREEDYDATA . **:匹配任意字符(贪婪模式)。
示例:This is a long text...
QUOTEDSTRING (?>(?<!\)(?>"(?>\.|[^\"]+)+"|""|(?>'(?>\.|[^\']+)+')|''|(?>(?>\\.|[^\\]+)+`)|``)):匹配带引号的字符串,可以是双引号、单引号或反引号。
示例:"Hello, world!" 'This is a test.'
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}:匹配 UUID(通用唯一标识符)。
示例:123e4567-e89b-12d3-a456-426655440000
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC}):匹配各种格式的 MAC 地址,如 Cisco、Windows 和通用格式。
示例:00:1A:2B:3C:4D:5E
AA-BB-CC-DD-EE-FF
001A2B3C4D5E
IPV6 ...:匹配 IPv6 地址。
示例:2001:0db8:85a3:0000:0000:8a2e:0370:7334
IPV4 ...:匹配 IPv4 地址。
示例:192.168.1.100
HOSTNAME ...:匹配主机名。
示例:example.com
subdomain.example.net
HOSTPORT %{IPORHOST}:%{POSINT}:匹配主机和端口号的组合。
示例:example.com:8080
localhost:80
URIPATH ...:匹配 URI 路径部分。
示例:/path/to/resource
MONTH ...:匹配月份的简写和全称。
示例:Jan
December
MONTHNUM ...:匹配月份的数字表示。
示例:01
12
DAY ...:匹配星期几的简写。
示例:Mon
YEAR ...:匹配年份。
示例:2023
HOUR ...:匹配小时。
示例:01
23
MINUTE ...:匹配分钟。
示例:05
50
SECOND ...:匹配秒数。
示例:15
59
DATE ...:匹配日期,包括多种格式。
示例:2023-08-13
08/13/2023
DATESTAMP_RFC822 ...:匹配 RFC 822 格式的日期时间。
示例:Mon, 15 Aug 2023 12:34:56 +0000
DATESTAMP_RFC2822 ...:匹配 RFC 2822 格式的日期时间。
示例:Tue, 16 Aug 2023 15:45:30 +0300
SYSLOGTIMESTAMP ...:匹配 Syslog 格式的日期时间。
示例:Aug 13 23:59:59
SYSLOGPROG ...:匹配 Syslog 格式的程序名和进程 ID。
示例:program[12345]
SYSLOGHOST ...:匹配 Syslog 格式的主机名。
示例:192.168.1.100
SYSLOGFACILITY ...:匹配 Syslog 格式的设备号和优先级。
示例:4.2
HTTPDATE ...:匹配 HTTP 格式的日期时间。
示例:13/Aug/2023:08:30:45 +0000
QS %{QUOTEDSTRING}:匹配带引号的字符串,同 QUOTEDSTRING。
示例:"Quoted text"
SYSLOGBASE ...:匹配 Syslog 基本格式,包括时间戳、设备号、主机名和程序名。
示例:Aug 13 23:59:59 hostname program[12345]:
COMMONAPACHELOG ...:匹配常见的 Apache 日志格式,包括客户端 IP、标识、认证、时间戳、请求、响应等信息。
示例:192.168.1.100 - user [13/Aug/2023:08:30:45 +0000] "GET /index.html HTTP/1.1" 200 1234
COMBINEDAPACHELOG ...:匹配综合的 Apache 日志格式,包括常见格式的信息以及引用和代理信息。
示例:192.168.1.100 - user [13/Aug/2023:08:30:45 +0000] "GET /index.html HTTP/1.1" 200 1234 "http://referrer.com" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36"
LOGLEVEL ...:匹配日志级别,包括各种大小写和缩写形式。
示例:DEBUG
WARN
ERR
···
1.将自定义的正则模式写入到一个文件中 [root@localhost ~]# mkdir /data/elk/logstash/regex/ [root@localhost ~]# vim /data/elk/logstash/regex/patterns ID [0-9]{3,6}$ 2.配置logstash [root@localhost ~]# cat /data/elk/logstash/conf.d/test.conf filter { grok { patterns_dir => "/data/elk/logstash/regex/patterns" #指定正则模式文件所在的路径 match => { "message" => "%{IP:client_ip} %{WORD:request_type} %{URIPATHPARAM:url} %{NUMBER:bytes} %{NUMBER:response_time} %{ID:id}" #增加上我们自定义的正则模式 } } } 3.重载logstash [root@localhost ~]# ps aux | grep logstash | grep -v grep | awk '{print $2}' |xargs kill -HUP-HUP
下载nc进行logstash的处理 通过端口进行监听
也可以写一个脚本,往里面传值
1、通过标准输入,进行标准输入
2、通过标准输入,将数据输出到ES集群
3、通过TCP进行数据输入,将数据输出到ES集群
一个进程在在运行过程中产生的信息
系统日志:messages lastlog maillog yum.log boot.log
应用日志: access.log mysqld.log error.log redis zabbix-server -agent rabbitmq 自研的一些应用
审计日志:access.log 订单日志 退单日志
诊断日志:error.log mysqld.log
使用filter进行处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。