赞
踩
目录
4.示例:设置Filebeat模块以使用Kafka和Logstash
Filebeat与预构建的模块打包在一起 ,其中包含收集,解析,丰富和可视化各种日志文件格式的数据所需的配置。每个Filebeat模块由一个或多个包含摄取节点管道的文件集,Elasticsearch模板,Filebeat输入配置和Kibana仪表板组成。
您可以将Filebeat模块与Logstash一起使用,但您需要进行一些额外的设置。最简单的方法是设置和使用 Filebeat提供的摄取管道。如果摄取管道不符合您的要求,您可以 创建Logstash配置以使用而不是摄取管道。
这两种方法都允许您使用Filebeat模块可用的配置,索引模板和仪表板,只要您维护索引和仪表板所需的字段结构即可。
将Filebeat模块与Logstash一起使用时,可以使用Filebeat提供的摄取管道来解析数据。您需要将管道加载到Elasticsearch并配置Logstash以使用它们。
在安装了Filebeat的系统上,运行setup
带有--pipelines
指定选项的命令 以加载特定模块的接收管道。例如,以下命令加载系统和nginx模块的摄取管道:
filebeat setup --pipelines --modules nginx,system
此安装步骤需要连接到Elasticsearch,因为Filebeat需要将摄取管道加载到Elasticsearch中。如有必要,可以在运行命令之前临时禁用已配置的输出并启用Elasticsearch输出。
在安装Logstash的系统上,创建一个Logstash管道配置,该配置从Logstash输入(如Beats或Kafka)读取,并将事件发送到Elasticsearch输出。在Elasticsearch输出中设置pipeline
选项 %{[@metadata][pipeline]}
以使用先前加载的摄取管道。
这是一个示例配置,它从Beats输入读取数据并使用Filebeat摄取管道来解析模块收集的数据:
- input {
- beats {
- port => 5044
- }
- }
-
- output {
- if [@metadata][pipeline] {
- elasticsearch {
- hosts => "https://061ab24010a2482e9d64729fdb0fd93a.us-east-1.aws.found.io:9243"
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- pipeline => "%{[@metadata][pipeline]}"
- user => "elastic"
- password => "secret"
- }
- } else {
- elasticsearch {
- hosts => "https://061ab24010a2482e9d64729fdb0fd93a.us-east-1.aws.found.io:9243"
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- user => "elastic"
- password => "secret"
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
将pipeline
选项设置为%{[@metadata][pipeline]}
。此设置将Logstash配置为根据事件中传递的元数据选择正确的摄取管道。
有关设置和运行模块的更多信息,请参阅Filebeat 模块文档。
有关完整示例,请参阅示例:设置Filebeat模块以使用Kafka和Logstash。
本节中的示例显示如何构建Logstash管道配置,以替换Filebeat模块提供的接收管道。管道获取Filebeat模块收集的数据,将其解析为Filebeat索引所期望的字段,并将字段发送到Elasticsearch,以便可以可视化Filebeat提供的预构建仪表板中的数据。
这种方法比使用现有的提取管道解析数据更耗时,但它可以让您更好地控制数据的处理方式。通过编写自己的管道配置,您可以在提取字段后执行其他处理,例如删除字段,或者可以将负载从Elasticsearch提取节点移动到Logstash节点。
在决定使用Logstash配置替换摄取管道之前,请阅读使用摄取管道进行分析。
以下是一些示例,说明如何实现Logstash配置以替换摄取管道:
Logstash提供了一个摄取管道转换工具 ,可帮助您将摄取管道定义迁移到Logstash配置。该工具目前不支持所有可用于摄取节点的处理器,但它是一个很好的起点。
此示例中的Logstash管道配置显示了如何发送和解析apache
Filebeat模块收集的访问和错误日志 。
- input {
- beats {
- port => 5044
- host => "0.0.0.0"
- }
- }
- filter {
- if [fileset][module] == "apache2" {
- if [fileset][name] == "access" {
- grok {
- match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?",
- "%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \\[%{HTTPDATE:[apache2][access][time]}\\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] }
- remove_field => "message"
- }
- mutate {
- add_field => { "read_timestamp" => "%{@timestamp}" }
- }
- date {
- match => [ "[apache2][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
- remove_field => "[apache2][access][time]"
- }
- useragent {
- source => "[apache2][access][agent]"
- target => "[apache2][access][user_agent]"
- remove_field => "[apache2][access][agent]"
- }
- geoip {
- source => "[apache2][access][remote_ip]"
- target => "[apache2][access][geoip]"
- }
- }
- else if [fileset][name] == "error" {
- grok {
- match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}",
- "\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] }
- pattern_definitions => {
- "APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
- }
- remove_field => "message"
- }
- mutate {
- rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
- }
- date {
- match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
- remove_field => "[apache2][error][timestamp]"
- }
- }
- }
- }
- output {
- elasticsearch {
- hosts => localhost
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
此示例中的Logstash管道配置显示了如何发送和解析由mysql
Filebeat模块收集的错误和慢速日志日志。
- input {
- beats {
- port => 5044
- host => "0.0.0.0"
- }
- }
- filter {
- if [fileset][module] == "mysql" {
- if [fileset][name] == "error" {
- grok {
- match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} (\[%{DATA:[mysql][error][level]}\] )?%{GREEDYDATA:[mysql][error][message]}",
- "%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} \[%{DATA:[mysql][error][level]}\] %{GREEDYDATA:[mysql][error][message1]}",
- "%{GREEDYDATA:[mysql][error][message2]}"] }
- pattern_definitions => {
- "LOCALDATETIME" => "[0-9]+ %{TIME}"
- }
- remove_field => "message"
- }
- mutate {
- rename => { "[mysql][error][message1]" => "[mysql][error][message]" }
- }
- mutate {
- rename => { "[mysql][error][message2]" => "[mysql][error][message]" }
- }
- date {
- match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]
- remove_field => "[mysql][error][time]"
- }
- }
- else if [fileset][name] == "slowlog" {
- grok {
- match => { "message" => ["^# User@Host: %{USER:[mysql][slowlog][user]}(\[[^\]]+\])? @ %{HOSTNAME:[mysql][slowlog][host]} \[(IP:[mysql][slowlog][ip])?\](\s*Id:\s* %{NUMBER:[mysql][slowlog][id]})?\n# Query_time: %{NUMBER:[mysql][slowlog][query_time][sec]}\s* Lock_time: %{NUMBER:[mysql][slowlog][lock_time][sec]}\s* Rows_sent: %{NUMBER:[mysql][slowlog][rows_sent]}\s* Rows_examined: %{NUMBER:[mysql][slowlog][rows_examined]}\n(SET timestamp=%{NUMBER:[mysql][slowlog][timestamp]};\n)?%{GREEDYMULTILINE:[mysql][slowlog][query]}"] }
- pattern_definitions => {
- "GREEDYMULTILINE" => "(.|\n)*"
- }
- remove_field => "message"
- }
- date {
- match => [ "[mysql][slowlog][timestamp]", "UNIX" ]
- }
- mutate {
- gsub => ["[mysql][slowlog][query]", "\n# Time: [0-9]+ [0-9][0-9]:[0-9][0-9]:[0-9][0-9](\\.[0-9]+)?$", ""]
- }
- }
- }
- }
- output {
- elasticsearch {
- hosts => localhost
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
此示例中的Logstash管道配置显示了如何发送和解析nginx
Filebeat模块收集的访问和错误日志 。
- input {
- beats {
- port => 5044
- host => "0.0.0.0"
- }
- }
- filter {
- if [fileset][module] == "nginx" {
- if [fileset][name] == "access" {
- grok {
- match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
- remove_field => "message"
- }
- mutate {
- add_field => { "read_timestamp" => "%{@timestamp}" }
- }
- date {
- match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
- remove_field => "[nginx][access][time]"
- }
- useragent {
- source => "[nginx][access][agent]"
- target => "[nginx][access][user_agent]"
- remove_field => "[nginx][access][agent]"
- }
- geoip {
- source => "[nginx][access][remote_ip]"
- target => "[nginx][access][geoip]"
- }
- }
- else if [fileset][name] == "error" {
- grok {
- match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
- remove_field => "message"
- }
- mutate {
- rename => { "@timestamp" => "read_timestamp" }
- }
- date {
- match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
- remove_field => "[nginx][error][time]"
- }
- }
- }
- }
- output {
- elasticsearch {
- hosts => localhost
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
此示例中的Logstash管道配置显示了如何发送和解析由system
Filebeat模块收集的系统日志 。
- input {
- beats {
- port => 5044
- host => "0.0.0.0"
- }
- }
- filter {
- if [fileset][module] == "system" {
- if [fileset][name] == "auth" {
- grok {
- match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
- "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
- "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
- "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
- "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
- "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
- "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
- pattern_definitions => {
- "GREEDYMULTILINE"=> "(.|\n)*"
- }
- remove_field => "message"
- }
- date {
- match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
- }
- geoip {
- source => "[system][auth][ssh][ip]"
- target => "[system][auth][ssh][geoip]"
- }
- }
- else if [fileset][name] == "syslog" {
- grok {
- match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
- pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
- remove_field => "message"
- }
- date {
- match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
- }
- }
- }
- }
- output {
- elasticsearch {
- hosts => localhost
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
本节介绍如何在发布管道中的Filebeat和Logstash之间使用Kafka时设置Filebeat 模块以使用Logstash。此示例的主要目标是展示如何从Filebeat加载摄取管道并将其与Logstash一起使用。
本节中的示例显示了主题名称为硬编码的简单配置。有关配置选项的完整列表,请参阅有关配置Kafka输入插件的文档。另请参阅 在Filebeat Reference中配置Kafka输出。
如果尚未设置Filebeat索引模板和示例Kibana仪表板,请运行Filebeat setup
命令立即执行此操作:
filebeat -e setup
该-e
标志是可选的,并将输出发送到标准错误而不是syslog。
此一次性设置步骤需要连接到Elasticsearch和Kibana,因为Filebeat需要在Elasticsearch中创建索引模板并将示例仪表板加载到Kibana中。有关配置与Elasticsearch的连接的详细信息,请参阅Filebeat模块快速入门 。
加载模板和仪表板后,您将看到该消息INFO {kib} dashboards successfully loaded. Loaded dashboards
。
运行该modules enable
命令以启用要运行的模块。例如:
filebeat modules enable system
您可以通过编辑Filebeat modules.d
目录下的配置文件来进一步配置模块。例如,如果日志文件不在模块预期的位置,则可以设置该var.paths
选项。
setup
使用指定的--pipelines
和--modules
选项运行命令,以加载已启用模块的接收管道。此步骤还需要连接到Elasticsearch。如果要使用Logstash管道而不是摄取节点来解析数据,请跳过此步骤。
filebeat setup --pipelines --modules system
配置Filebeat以将日志行发送到Kafka。为此,在 filebeat.yml
配置文件中,通过注释掉它来禁用Elasticsearch输出,并启用Kafka输出。例如:
- #output.elasticsearch:
- #hosts: ["localhost:9200"]
- output.kafka:
- hosts: ["kafka:9092"]
- topic: "filebeat"
- codec.json:
- pretty: false
启动Filebeat。例如:
filebeat -e
Filebeat将尝试向Logstash发送消息并继续,直到Logstash可用于接收它们。
根据您安装Filebeat的方式,当您尝试运行Filebeat模块时,可能会看到与文件所有权或权限相关的错误。如果遇到与文件所有权或权限相关的错误,请参阅Beats Platform Reference 中的Config File Ownership and Permissions。
1.在安装Logstash的系统上,创建一个Logstash管道配置,该配置从Kafka输入读取并将事件发送到Elasticsearch输出:
- input {
- kafka {
- bootstrap_servers => "myhost:9092"
- topics => ["filebeat"]
- codec => json
- }
- }
-
- output {
- if [@metadata][pipeline] {
- elasticsearch {
- hosts => "https://myEShost:9200"
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- pipeline => "%{[@metadata][pipeline]}"
- user => "elastic"
- password => "secret"
- }
- } else {
- elasticsearch {
- hosts => "https://myEShost:9200"
- manage_template => false
- index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
- user => "elastic"
- password => "secret"
- }
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
将pipeline
选项设置为%{[@metadata][pipeline]}
。此设置将Logstash配置为根据事件中传递的元数据选择正确的摄取管道。
如果要使用Logstash管道而不是接收节点来解析数据,请参阅使用Logstash管道下的示例中的filter
和output
设置 进行解析。
2.启动Logstash,传入刚刚定义的管道配置文件。例如:
bin/logstash -f mypipeline.conf
Logstash应该启动管道并开始从Kafka输入接收事件。
要显示Kibana中的数据,请通过将浏览器指向端口5601来启动Kibana Web界面。例如,http://127.0.0.1: 5601 。单击仪表板,然后查看Filebeat仪表板。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。