"localhost:2181" group_id => "test" topic_id => "test"..._logstash收集日志到es">
当前位置:   article > 正文

logstash采集数据到es_logstash收集日志到es

logstash收集日志到es

日志格式:

{"Q1":62442457475833333,"Q2":2016811232228686,"date":1556186700487}

logstash配置文件:

  1. input {
  2. kafka {
  3. zk_connect => "localhost:2181"
  4. group_id => "test"
  5. topic_id => "test"
  6. #auto_offset_reset => "smallest"
  7. }
  8. }
  9. filter {
  10. date {
  11. match => ["date", "yyyy-MM-dd HH:mm:ss.SS", "UNIX_MS"]
  12. target => "@timestamp"
  13. locale => "cn"
  14. }
  15. ruby {
  16. code => '
  17. require "json"
  18. begin
  19. event["msg_time"] = event["@timestamp"].time.getlocal("+08:00").strftime "%Y-%m-%d %H:%M:%S"
  20. event["date_id"] = event["@timestamp"].time.getlocal("+08:00").strftime "%Y-%m-%d"
  21. event["hours"] = event["@timestamp"].time.getlocal("+08:00").strftime("%H").to_i
  22. # 为了保证写到es中Q1+Q2唯一,所以先把他们组合起来
  23. event["doc_id"] = event["Q1"].to_s + event["Q2"].to_s
  24. rescue Exception => e
  25. end
  26. '
  27. }
  28. }
  29. output {
  30. if "_jsonparsefailure" not in [tags]{
  31. stdout{codec=>"rubydebug"}
  32. elasticsearch {
  33. # http默认端口9200
  34. protocol => "http"
  35. #刷新数据的大小
  36. flush_size => 5000
  37. #没满足上面配置的话,5s后也会写出数据
  38. idle_flush_time => 5
  39. # es集群
  40. cluster => "es"
  41. host => ["localhost:9200"]
  42. index => "stat-logstash-q"
  43. # 保证写入到es的数据唯一
  44. document_id => "%{doc_id}"
  45. }
  46. }
  47. }

遇到的问题:

1.刚开始写入一直报错:failed action with response of 404, dropping action: ["index", {

经排查,环境是没有问题的,后来发现是

es配置了 自动创建index的名称正则规则,而我之前的index => logstash-q,不符合规范,所以一直失败,改了符合配置的规范

index => "stat-logstash-q" 就ok了。

2.需求方要要求写入es的数据不能重复,即Q1+Q2唯一,通过查看logstash官网得知,document_id属性就是控制唯一的,同样的数据会执行update操作

所以先把Q1和Q2组装起来,然后使 document_id等于组装的值

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/994908
推荐阅读
相关标签
  

闽ICP备14008679号