当前位置:   article > 正文

Logstash【从无到有从有到无】【L9】使用Filebeat模块_logstash 判断filebeat模块

logstash 判断filebeat模块

目录

1.使用Filebeat模块

2.使用摄取管道进行解析

2.1.加载摄取管道

2.2.配置logstash以使用管道

3.使用Logstash管道进行解析

3.1.Apache 2日志

3.2.MySQL日志

3.3.Nginx日志

3.4.系统日志

4.示例:设置Filebeat模块以使用Kafka和Logstash

4.1.设置并运行Filebeat

4.2.创建并启动Logstash管道

4.3.可视化数据


1.使用Filebeat模块

Filebeat与预构建的模块打包在一起 ,其中包含收集,解析,丰富和可视化各种日志文件格式的数据所需的配置。每个Filebeat模块由一个或多个包含摄取节点管道的文件集,Elasticsearch模板,Filebeat输入配置和Kibana仪表板组成。

您可以将Filebeat模块与Logstash一起使用,但您需要进行一些额外的设置。最简单的方法是设置和使用 Filebeat提供的摄取管道。如果摄取管道不符合您的要求,您可以 创建Logstash配置以使用而不是摄取管道。

这两种方法都允许您使用Filebeat模块可用的配置,索引模板和仪表板,只要您维护索引和仪表板所需的字段结构即可。

 

2.使用摄取管道进行解析

将Filebeat模块与Logstash一起使用时,可以使用Filebeat提供的摄取管道来解析数据。您需要将管道加载到Elasticsearch并配置Logstash以使用它们。

 

2.1.加载摄取管道

在安装了Filebeat的系统上,运行setup带有--pipelines指定选项的命令 以加载特定模块的接收管道。例如,以下命令加载系统和nginx模块的摄取管道:

filebeat setup --pipelines --modules nginx,system

此安装步骤需要连接到Elasticsearch,因为Filebeat需要将摄取管道加载到Elasticsearch中。如有必要,可以在运行命令之前临时禁用已配置的输出并启用Elasticsearch输出。

 

2.2.配置logstash以使用管道

在安装Logstash的系统上,创建一个Logstash管道配置,该配置从Logstash输入(如Beats或Kafka)读取,并将事件发送到Elasticsearch输出。在Elasticsearch输出中设置pipeline选项 %{[@metadata][pipeline]}以使用先前加载的摄取管道。

这是一个示例配置,它从Beats输入读取数据并使用Filebeat摄取管道来解析模块收集的数据:

  1. input {
  2. beats {
  3. port => 5044
  4. }
  5. }
  6. output {
  7. if [@metadata][pipeline] {
  8. elasticsearch {
  9. hosts => "https://061ab24010a2482e9d64729fdb0fd93a.us-east-1.aws.found.io:9243"
  10. manage_template => false
  11. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  12. pipeline => "%{[@metadata][pipeline]}"
  13. user => "elastic"
  14. password => "secret"
  15. }
  16. } else {
  17. elasticsearch {
  18. hosts => "https://061ab24010a2482e9d64729fdb0fd93a.us-east-1.aws.found.io:9243"
  19. manage_template => false
  20. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  21. user => "elastic"
  22. password => "secret"
  23. }
  24. }
  25. }

pipeline选项设置为%{[@metadata][pipeline]}。此设置将Logstash配置为根据事件中传递的元数据选择正确的摄取管道。

有关设置和运行模块的更多信息,请参阅Filebeat 模块文档。

有关完整示例,请参阅示例:设置Filebeat模块以使用Kafka和Logstash

 

3.使用Logstash管道进行解析

本节中的示例显示如何构建Logstash管道配置,以替换Filebeat模块提供的接收管道。管道获取Filebeat模块收集的数据,将其解析为Filebeat索引所期望的字段,并将字段发送到Elasticsearch,以便可以可视化Filebeat提供的预构建仪表板中的数据。

这种方法比使用现有的提取管道解析数据更耗时,但它可以让您更好地控制数据的处理方式。通过编写自己的管道配置,您可以在提取字段后执行其他处理,例如删除字段,或者可以将负载从Elasticsearch提取节点移动到Logstash节点。

在决定使用Logstash配置替换摄取管道之前,请阅读使用摄取管道进行分析

以下是一些示例,说明如何实现Logstash配置以替换摄取管道:

Logstash提供了一个摄取管道转换工具 ,可帮助您将摄取管道定义迁移到Logstash配置。该工具目前不支持所有可用于摄取节点的处理器,但它是一个很好的起点。

 

3.1.Apache 2日志

此示例中的Logstash管道配置显示了如何发送和解析apacheFilebeat模块收集的访问和错误日​​志 。

  1. input {
  2. beats {
  3. port => 5044
  4. host => "0.0.0.0"
  5. }
  6. }
  7. filter {
  8. if [fileset][module] == "apache2" {
  9. if [fileset][name] == "access" {
  10. grok {
  11. match => { "message" => ["%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \[%{HTTPDATE:[apache2][access][time]}\] \"%{WORD:[apache2][access][method]} %{DATA:[apache2][access][url]} HTTP/%{NUMBER:[apache2][access][http_version]}\" %{NUMBER:[apache2][access][response_code]} %{NUMBER:[apache2][access][body_sent][bytes]}( \"%{DATA:[apache2][access][referrer]}\")?( \"%{DATA:[apache2][access][agent]}\")?",
  12. "%{IPORHOST:[apache2][access][remote_ip]} - %{DATA:[apache2][access][user_name]} \\[%{HTTPDATE:[apache2][access][time]}\\] \"-\" %{NUMBER:[apache2][access][response_code]} -" ] }
  13. remove_field => "message"
  14. }
  15. mutate {
  16. add_field => { "read_timestamp" => "%{@timestamp}" }
  17. }
  18. date {
  19. match => [ "[apache2][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
  20. remove_field => "[apache2][access][time]"
  21. }
  22. useragent {
  23. source => "[apache2][access][agent]"
  24. target => "[apache2][access][user_agent]"
  25. remove_field => "[apache2][access][agent]"
  26. }
  27. geoip {
  28. source => "[apache2][access][remote_ip]"
  29. target => "[apache2][access][geoip]"
  30. }
  31. }
  32. else if [fileset][name] == "error" {
  33. grok {
  34. match => { "message" => ["\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{LOGLEVEL:[apache2][error][level]}\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message]}",
  35. "\[%{APACHE_TIME:[apache2][error][timestamp]}\] \[%{DATA:[apache2][error][module]}:%{LOGLEVEL:[apache2][error][level]}\] \[pid %{NUMBER:[apache2][error][pid]}(:tid %{NUMBER:[apache2][error][tid]})?\]( \[client %{IPORHOST:[apache2][error][client]}\])? %{GREEDYDATA:[apache2][error][message1]}" ] }
  36. pattern_definitions => {
  37. "APACHE_TIME" => "%{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}"
  38. }
  39. remove_field => "message"
  40. }
  41. mutate {
  42. rename => { "[apache2][error][message1]" => "[apache2][error][message]" }
  43. }
  44. date {
  45. match => [ "[apache2][error][timestamp]", "EEE MMM dd H:m:s YYYY", "EEE MMM dd H:m:s.SSSSSS YYYY" ]
  46. remove_field => "[apache2][error][timestamp]"
  47. }
  48. }
  49. }
  50. }
  51. output {
  52. elasticsearch {
  53. hosts => localhost
  54. manage_template => false
  55. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  56. }
  57. }

 

3.2.MySQL日志

此示例中的Logstash管道配置显示了如何发送和解析由mysqlFilebeat模块收集的错误和慢速日志日志。

  1. input {
  2. beats {
  3. port => 5044
  4. host => "0.0.0.0"
  5. }
  6. }
  7. filter {
  8. if [fileset][module] == "mysql" {
  9. if [fileset][name] == "error" {
  10. grok {
  11. match => { "message" => ["%{LOCALDATETIME:[mysql][error][timestamp]} (\[%{DATA:[mysql][error][level]}\] )?%{GREEDYDATA:[mysql][error][message]}",
  12. "%{TIMESTAMP_ISO8601:[mysql][error][timestamp]} %{NUMBER:[mysql][error][thread_id]} \[%{DATA:[mysql][error][level]}\] %{GREEDYDATA:[mysql][error][message1]}",
  13. "%{GREEDYDATA:[mysql][error][message2]}"] }
  14. pattern_definitions => {
  15. "LOCALDATETIME" => "[0-9]+ %{TIME}"
  16. }
  17. remove_field => "message"
  18. }
  19. mutate {
  20. rename => { "[mysql][error][message1]" => "[mysql][error][message]" }
  21. }
  22. mutate {
  23. rename => { "[mysql][error][message2]" => "[mysql][error][message]" }
  24. }
  25. date {
  26. match => [ "[mysql][error][timestamp]", "ISO8601", "YYMMdd H:m:s" ]
  27. remove_field => "[mysql][error][time]"
  28. }
  29. }
  30. else if [fileset][name] == "slowlog" {
  31. grok {
  32. match => { "message" => ["^# User@Host: %{USER:[mysql][slowlog][user]}(\[[^\]]+\])? @ %{HOSTNAME:[mysql][slowlog][host]} \[(IP:[mysql][slowlog][ip])?\](\s*Id:\s* %{NUMBER:[mysql][slowlog][id]})?\n# Query_time: %{NUMBER:[mysql][slowlog][query_time][sec]}\s* Lock_time: %{NUMBER:[mysql][slowlog][lock_time][sec]}\s* Rows_sent: %{NUMBER:[mysql][slowlog][rows_sent]}\s* Rows_examined: %{NUMBER:[mysql][slowlog][rows_examined]}\n(SET timestamp=%{NUMBER:[mysql][slowlog][timestamp]};\n)?%{GREEDYMULTILINE:[mysql][slowlog][query]}"] }
  33. pattern_definitions => {
  34. "GREEDYMULTILINE" => "(.|\n)*"
  35. }
  36. remove_field => "message"
  37. }
  38. date {
  39. match => [ "[mysql][slowlog][timestamp]", "UNIX" ]
  40. }
  41. mutate {
  42. gsub => ["[mysql][slowlog][query]", "\n# Time: [0-9]+ [0-9][0-9]:[0-9][0-9]:[0-9][0-9](\\.[0-9]+)?$", ""]
  43. }
  44. }
  45. }
  46. }
  47. output {
  48. elasticsearch {
  49. hosts => localhost
  50. manage_template => false
  51. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  52. }
  53. }

 

3.3.Nginx日志

此示例中的Logstash管道配置显示了如何发送和解析nginxFilebeat模块收集的访问和错误日​​志 。

  1. input {
  2. beats {
  3. port => 5044
  4. host => "0.0.0.0"
  5. }
  6. }
  7. filter {
  8. if [fileset][module] == "nginx" {
  9. if [fileset][name] == "access" {
  10. grok {
  11. match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
  12. remove_field => "message"
  13. }
  14. mutate {
  15. add_field => { "read_timestamp" => "%{@timestamp}" }
  16. }
  17. date {
  18. match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
  19. remove_field => "[nginx][access][time]"
  20. }
  21. useragent {
  22. source => "[nginx][access][agent]"
  23. target => "[nginx][access][user_agent]"
  24. remove_field => "[nginx][access][agent]"
  25. }
  26. geoip {
  27. source => "[nginx][access][remote_ip]"
  28. target => "[nginx][access][geoip]"
  29. }
  30. }
  31. else if [fileset][name] == "error" {
  32. grok {
  33. match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
  34. remove_field => "message"
  35. }
  36. mutate {
  37. rename => { "@timestamp" => "read_timestamp" }
  38. }
  39. date {
  40. match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
  41. remove_field => "[nginx][error][time]"
  42. }
  43. }
  44. }
  45. }
  46. output {
  47. elasticsearch {
  48. hosts => localhost
  49. manage_template => false
  50. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  51. }
  52. }

 

3.4.系统日志

此示例中的Logstash管道配置显示了如何发送和解析由systemFilebeat模块收集的系统日志 。

  1. input {
  2. beats {
  3. port => 5044
  4. host => "0.0.0.0"
  5. }
  6. }
  7. filter {
  8. if [fileset][module] == "system" {
  9. if [fileset][name] == "auth" {
  10. grok {
  11. match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
  12. "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
  13. "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
  14. "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
  15. "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
  16. "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
  17. "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
  18. pattern_definitions => {
  19. "GREEDYMULTILINE"=> "(.|\n)*"
  20. }
  21. remove_field => "message"
  22. }
  23. date {
  24. match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
  25. }
  26. geoip {
  27. source => "[system][auth][ssh][ip]"
  28. target => "[system][auth][ssh][geoip]"
  29. }
  30. }
  31. else if [fileset][name] == "syslog" {
  32. grok {
  33. match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
  34. pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
  35. remove_field => "message"
  36. }
  37. date {
  38. match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
  39. }
  40. }
  41. }
  42. }
  43. output {
  44. elasticsearch {
  45. hosts => localhost
  46. manage_template => false
  47. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  48. }
  49. }

 

4.示例:设置Filebeat模块以使用Kafka和Logstash

本节介绍如何在发布管道中的Filebeat和Logstash之间使用Kafka时设置Filebeat 模块以使用Logstash。此示例的主要目标是展示如何从Filebeat加载摄取管道并将其与Logstash一起使用。

本节中的示例显示了主题名称为硬编码的简单配置。有关配置选项的完整列表,请参阅有关配置Kafka输入插件的文档。另请参阅 在Filebeat Reference中配置Kafka输出

 

4.1.设置并运行Filebeat

  1. 如果尚未设置Filebeat索引模板和示例Kibana仪表板,请运行Filebeat setup命令立即执行此操作:

    filebeat -e setup

    -e标志是可选的,并将输出发送到标准错误而不是syslog。

    此一次性设置步骤需要连接到Elasticsearch和Kibana,因为Filebeat需要在Elasticsearch中创建索引模板并将示例仪表板加载到Kibana中。有关配置与Elasticsearch的连接的详细信息,请参阅Filebeat模块快速入门 。

    加载模板和仪表板后,您将看到该消息INFO {kib} dashboards successfully loaded. Loaded dashboards

  2. 运行该modules enable命令以启用要运行的模块。例如:

    filebeat modules enable system

    您可以通过编辑Filebeat modules.d目录下的配置文件来进一步配置模块。例如,如果日志文件不在模块预期的位置,则可以设置该var.paths选项。

  3. setup使用指定的--pipelines--modules选项运行命令,以加载已启用模块的接收管道。此步骤还需要连接到Elasticsearch。如果要使用Logstash管道而不是摄取节点来解析数据,请跳过此步骤。

    filebeat setup --pipelines --modules system
  4. 配置Filebeat以将日志行发送到Kafka。为此,在 filebeat.yml配置文件中,通过注释掉它来禁用Elasticsearch输出,并启用Kafka输出。例如:

    1. #output.elasticsearch:
    2. #hosts: ["localhost:9200"]
    3. output.kafka:
    4. hosts: ["kafka:9092"]
    5. topic: "filebeat"
    6. codec.json:
    7. pretty: false
  5. 启动Filebeat。例如:

    filebeat -e

    Filebeat将尝试向Logstash发送消息并继续,直到Logstash可用于接收它们。

     

    根据您安装Filebeat的方式,当您尝试运行Filebeat模块时,可能会看到与文件所有权或权限相关的错误。如果遇到与文件所有权或权限相关的错误,请参阅Beats Platform Reference 中的Config File Ownership and Permissions

 

4.2.创建并启动Logstash管道

1.在安装Logstash的系统上,创建一个Logstash管道配置,该配置从Kafka输入读取并将事件发送到Elasticsearch输出:

  1. input {
  2. kafka {
  3. bootstrap_servers => "myhost:9092"
  4. topics => ["filebeat"]
  5. codec => json
  6. }
  7. }
  8. output {
  9. if [@metadata][pipeline] {
  10. elasticsearch {
  11. hosts => "https://myEShost:9200"
  12. manage_template => false
  13. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  14. pipeline => "%{[@metadata][pipeline]}"
  15. user => "elastic"
  16. password => "secret"
  17. }
  18. } else {
  19. elasticsearch {
  20. hosts => "https://myEShost:9200"
  21. manage_template => false
  22. index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  23. user => "elastic"
  24. password => "secret"
  25. }
  26. }
  27. }

pipeline选项设置为%{[@metadata][pipeline]}。此设置将Logstash配置为根据事件中传递的元数据选择正确的摄取管道。

如果要使用Logstash管道而不是接收节点来解析数据,请参阅使用Logstash管道下的示例中的filteroutput设置 进行解析

 

2.启动Logstash,传入刚刚定义的管道配置文件。例如:

bin/logstash -f mypipeline.conf

Logstash应该启动管道并开始从Kafka输入接收事件。

 

4.3.可视化数据

要显示Kibana中的数据,请通过将浏览器指向端口5601来启动Kibana Web界面。例如,http://127.0.0.1: 5601 。单击仪表板,然后查看Filebeat仪表板。

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/534689
推荐阅读
相关标签
  

闽ICP备14008679号