["logq","applog"] # 可配置多个topic group_id => "test" # 消费者组 client_id => "test" _logstash kafka ">
当前位置:   article > 正文

logstash-input-kafka 说明_logstash kafka input 详解

logstash kafka input 详解
input {
    kafka {
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        topics => ["logq","applog"]     # 可配置多个topic
        group_id => "test"              # 消费者组
        client_id => "test"             # 
        auto_offset_reset => "latest"   # 从最新的偏移量开始消费
        decorate_events => true         # 附加 topic、offset、group、partition 等信息 ...
        max_partition_fetch_bytes => "5242880"
        consumer_threads => "5"
        codec => "json"
    }
}

filter {
    if ([level] == "ERROR" or [level] == "WARN") and [systemCode] =~ "ng*" {
        ruby {
            code => 'event.set("logdate", Time.at(event.get("logTime")/1000+8*60*60).to_s)'
        }

        date {
            match => ["logdate", "yyyy-MM-dd HH:mm:ss Z"]
            target => "@timestamp"
        }

        mutate {
            add_field => { "mm" => "%{+mm}" }
        }

        ruby {
            code => 'event.set("minute", event.get("mm").to_i/10)'
        }

        mutate {
            remove_field => [ "mm", "logdate", "@version", "host", "path", "tags", "message" ]
        }
    } else {
        ruby {
            code => "event.cancel"
        }
    }
}

output {
    file {
        path => "/data/monitor/monitor.log.%{+YYYYMMddHH}%{minute}"
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号