当前位置:   article > 正文

使用 Logstash 丰富你的 Elasticsearch 文档

使用 Logstash 丰富你的 Elasticsearch 文档

作者:来自 Elastic David Pilato

我们在上一篇文章中看到,我们可以使用摄取管道中的 Elasticsearch Enrich ProcessorElasticsearch® 中进行数据丰富。 但有时,你需要执行更复杂的任务,或者你的数据源不是 Elasticsearch,而是另一个源。 或者,你可能希望存储在 Elasticsearch 和第三方系统中,在这种情况下,将管道的执行转移到 Logstash® 很有意义。

使用 Elasticsearch 丰富 Elasticsearch 数据

使用 Logstash,使用类似于以下的管道,这非常容易:

  1. input {
  2. # Read all documents from Elasticsearch
  3. elasticsearch {
  4. hosts => ["${ELASTICSEARCH_URL}"]
  5. user => "elastic"
  6. password => "${ELASTIC_PASSWORD}"
  7. index => "kibana_sample_data_logs"
  8. docinfo => true
  9. ecs_compatibility => "disabled"
  10. }
  11. }
  12. filter {
  13. # Enrich every document with Elasticsearch
  14. elasticsearch {
  15. hosts => ["${ELASTICSEARCH_URL}"]
  16. user => "elastic"
  17. password => "${ELASTIC_PASSWORD}"
  18. index => "vip"
  19. query => "ip:%{[clientip]}"
  20. sort => "ip:desc"
  21. fields => {
  22. "[name]" => "[name]"
  23. "[vip]" => "[vip]"
  24. }
  25. }
  26. mutate {
  27. remove_field => ["@version", "@timestamp"]
  28. }
  29. }
  30. output {
  31. if [name] {
  32. # Write all modified documents to Elasticsearch
  33. elasticsearch {
  34. manage_template => false
  35. hosts => ["${ELASTICSEARCH_URL}"]
  36. user => "elastic"
  37. password => "${ELASTIC_PASSWORD}"
  38. index => "%{[@metadata][_index]}"
  39. document_id => "%{[@metadata][_id]}"
  40. }
  41. }
  42. }

总共,我们有 14074 个事件需要解析。 虽然不是很多,但对于这个演示来说已经足够了。 这是一个示例事件:

  1. {
  2. "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  3. "bytes": 1831,
  4. "clientip": "30.156.16.164",
  5. "extension": "",
  6. "geo": {
  7. "srcdest": "US:IN",
  8. "src": "US",
  9. "dest": "IN",
  10. "coordinates": {
  11. "lat": 55.53741389,
  12. "lon": -132.3975144
  13. }
  14. },
  15. "host": "elastic-elastic-elastic.org",
  16. "index": "kibana_sample_data_logs",
  17. "ip": "30.156.16.163",
  18. "machine": {
  19. "ram": 9663676416,
  20. "os": "win xp"
  21. },
  22. "memory": 73240,
  23. "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  24. "phpmemory": 73240,
  25. "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  26. "request": "/wp-login.php",
  27. "response": 404,
  28. "tags": [
  29. "success",
  30. "info"
  31. ],
  32. "timestamp": "2023-03-18T12:43:49.756Z",
  33. "url": "https://elastic-elastic-elastic.org/wp-login.php",
  34. "utc_time": "2023-03-18T12:43:49.756Z",
  35. "event": {
  36. "dataset": "sample_web_logs"
  37. }
  38. }

正如我们在上一篇文章中看到的,vip 索引包含有关我们客户的信息:

  1. {
  2. "ip" : "30.156.16.164",
  3. "vip": true,
  4. "name": "David P"
  5. }

我们可以通过以下方式运行管道:

  1. docker run \
  2. --name=logstash \
  3. --rm -it \
  4. -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  5. -e XPACK_MONITORING_ENABLED=false \
  6. -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  7. -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  8. docker.elastic.co/logstash/logstash:8.12.0

丰富的文档现在看起来像这样:

  1. {
  2. "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  3. "bytes": 1831,
  4. "clientip": "30.156.16.164",
  5. "extension": "",
  6. "geo": {
  7. "srcdest": "US:IN",
  8. "src": "US",
  9. "dest": "IN",
  10. "coordinates": {
  11. "lat": 55.53741389,
  12. "lon": -132.3975144
  13. }
  14. },
  15. "host": "elastic-elastic-elastic.org",
  16. "index": "kibana_sample_data_logs",
  17. "ip": "30.156.16.163",
  18. "machine": {
  19. "ram": 9663676416,
  20. "os": "win xp"
  21. },
  22. "memory": 73240,
  23. "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  24. "phpmemory": 73240,
  25. "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  26. "request": "/wp-login.php",
  27. "response": 404,
  28. "tags": [
  29. "success",
  30. "info"
  31. ],
  32. "timestamp": "2023-03-18T12:43:49.756Z",
  33. "url": "https://elastic-elastic-elastic.org/wp-login.php",
  34. "utc_time": "2023-03-18T12:43:49.756Z",
  35. "event": {
  36. "dataset": "sample_web_logs"
  37. },
  38. "vip": true,
  39. "name": "David P"
  40. }

实际上很简单,但有一个问题:速度很慢。 通过网络进行查找,尽管 Elasticsearch 速度极快,但仍然会减慢整个管道的速度。

使用静态 JDBC 过滤器

我最近在 ParisJUG 遇到了 Laurent,他来自令人惊叹的 Elastic Consulting 团队,我们讨论了这个问题。 他告诉我,他的一位客户必须面对这个问题。 他建议改用 Logstash 中的 Elasticsearch 缓存。

问题是:Logstash 中没有这样的过滤器缓存插件。 他找到了一种非常聪明的方法来解决该问题,即利用静态 JDBC 过滤器插件Elasticsearch JDBC 驱动程序

请注意,这需要拥有白金许可证(或试用版)。

添加 Elasticsearch JDBC 驱动程序

我们首先需要将 JDBC 驱动程序添加到 Logstash 实例中。

  1. mdir -p logstash-config/lib
  2. wget https://artifacts.elastic.co/maven/org/elasticsearch/plugin/x-pack-sql-jdbc/8.12.0/x-pack-sql-jdbc-8.12.0.jar
  3. mv x-pack-sql-jdbc-8.12.0.jar logstash-config/lib

我们只需要与 Logstash docker 实例共享此目录:

  1. time docker run \
  2. --name=logstash \
  3. --rm -it \
  4. -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  5. -v $(pwd)/logstash-config/lib/:/tmp/lib/ \
  6. -e XPACK_MONITORING_ENABLED=false \
  7. -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  8. -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  9. docker.elastic.co/logstash/logstash:8.12.0

更新管道

input 部分不变。 但现在,我们要在内存中创建一个名为 vip 的临时表(为了保持一致性)。 该表结构是使用 local_db_objects 参数定义的:

  1. jdbc_static {
  2. local_db_objects => [ {
  3. name => "vip"
  4. index_columns => ["ip"]
  5. columns => [
  6. ["name", "VARCHAR(255)"],
  7. ["vip", "BOOLEAN"],
  8. ["ip", "VARCHAR(64)"]
  9. ]
  10. } ]
  11. }

当 jdbc_static 启动时,我们要首先从 Elasticsearch vip索引中读取所有数据集。 这是在 loaders 选项中完成的:

  1. jdbc_static {
  2. loaders => [ {
  3. query => "select name, vip, ip from vip"
  4. local_table => "vip"
  5. } ]
  6. jdbc_user => "elastic"
  7. jdbc_password => "${ELASTIC_PASSWORD}"
  8. jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
  9. jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
  10. jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
  11. }

每次我们需要进行查找时,我们都希望使用以下语句来执行它:

SELECT name, vip FROM vip WHERE ip = "THE_IP"

这可以使用 local_lookups 参数定义:

  1. jdbc_static {
  2. local_lookups => [ {
  3. query => "SELECT name, vip FROM vip WHERE ip = :ip"
  4. parameters => { "ip" => "clientip" }
  5. target => "vip"
  6. } ]
  7. }

如果没有找到数据,我们可以使用 default_hash 选项提供默认值:

  1. jdbc_static {
  2. local_lookups => [ {
  3. query => "SELECT name, vip FROM vip WHERE ip = :ip"
  4. parameters => { "ip" => "clientip" }
  5. target => "vip"
  6. default_hash => {
  7. name => nil
  8. vip => false
  9. }
  10. } ]
  11. }

最后,这将在事件中生成 vip.name 和 vip.vip 字段。

我们现在可以定义我们想要对这些临时字段执行的操作:

  1. jdbc_static {
  2. add_field => { name => "%{[vip][0][name]}" }
  3. add_field => { vip => "%{[vip][0][vip]}" }
  4. remove_field => ["vip"]
  5. }

这给出了以下过滤器:

  1. filter {
  2. # Enrich every document with Elasticsearch via static JDBC
  3. jdbc_static {
  4. loaders => [ {
  5. query => "select name, vip, ip from vip"
  6. local_table => "vip"
  7. } ]
  8. local_db_objects => [ {
  9. name => "vip"
  10. index_columns => ["ip"]
  11. columns => [
  12. ["name", "VARCHAR(255)"],
  13. ["vip", "BOOLEAN"],
  14. ["ip", "VARCHAR(64)"]
  15. ]
  16. } ]
  17. local_lookups => [ {
  18. query => "SELECT name, vip FROM vip WHERE ip = :ip"
  19. parameters => { "ip" => "clientip" }
  20. target => "vip"
  21. default_hash => {
  22. name => nil
  23. vip => false
  24. }
  25. } ]
  26. add_field => { name => "%{[vip][0][name]}" }
  27. add_field => { vip => "%{[vip][0][vip]}" }
  28. remove_field => ["vip"]
  29. jdbc_user => "elastic"
  30. jdbc_password => "${ELASTIC_PASSWORD}"
  31. jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
  32. jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
  33. jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
  34. }
  35. mutate {
  36. remove_field => ["@version", "@timestamp"]
  37. }
  38. }

将修改后的文档写入Elasticsearch

在第一个管道中,我们测试事件中是否确实存在名称字段:

  1. if [name] {
  2. # Index to Elasticsearch
  3. }

我们仍然可以使用类似的东西,但因为我们提供了默认值,以防在 Elasticsearch vip 索引中找不到 ip,所以现在它会在标签表中生成一个新的 _jdbcstaticdefaultsused 标签。

我们可以用它来知道我们是否发现了某些东西,如果是前者,则将我们的数据发送到 Elasticsearch:

  1. output {
  2. if "_jdbcstaticdefaultsused" not in [tags] {
  3. # Write all the modified documents to Elasticsearch
  4. elasticsearch {
  5. manage_template => false
  6. hosts => ["${ELASTICSEARCH_URL}"]
  7. user => "elastic"
  8. password => "${ELASTIC_PASSWORD}"
  9. index => "%{[@metadata][_index]}"
  10. document_id => "%{[@metadata][_id]}"
  11. }
  12. }
  13. }

更快吗?

因此,当我们在这个小数据集上运行测试时,我们可以看到,使用 Elasticsearch 过滤器方法,需要两分钟多一点的时间来丰富我们的数据集:

  1. real 2m3.146s
  2. user 0m0.077s
  3. sys 0m0.042s

当使用 JDBC 静态过滤器方法运行管道时,现在只需不到一分钟:

  1. real 0m48.575s
  2. user 0m0.064s
  3. sys 0m0.039s

正如我们所看到的,我们显着减少了该丰富管道的执行时间(增益约为 60%)。

如果你有一个可以轻松放入 Logstash JVM 内存的小型 Elasticsearch 索引,你可以尝试此策略(或类似的策略)。 如果你有数亿个文档,你仍然应该使用 Elasticsearch Filter Plugin

结论

在这篇文章中,我们了解了当我们需要在 Elasticsearch 中执行一些查找时,如何使用 JDBC 静态过滤器插件来加速数据丰富管道。 在下一篇文章中,我们将了解如何使用 Elastic Agent 在边缘进行类似的丰富。

本文中描述的任何特性或功能的发布和时间安排均由 Elastic 自行决定。 当前不可用的任何特性或功能可能无法按时交付或根本无法交付

更多阅读:

原文:Enrich your Elasticsearch documents with Logstash | Elastic Blog

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/242378
推荐阅读
相关标签
  

闽ICP备14008679号