当前位置:   article > 正文

logback kafka输出日志到ELK_logback怎么打印日志 elk好分词

logback怎么打印日志 elk好分词

参考 logback+kafka+elk搭建日志, 学习总结

日志流程: logback -> kafka -> logstash -> elasticsearch -> kibana

kafka安装启动

  • 官方下载, 选择Binary downloads下载
  • 先启动zookeeper
    bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 启动kafka
    bin/kafka-server-start.sh config/server.properties &

logback与kafka集成

kafka与logback使用的是 logback-kafka-appender

  • 引入pom依赖
<!--kafka依赖-->
<dependency>
	<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.6.RELEASE</version>
</dependency>
<!--logback-kafka-appender依赖-->
<dependency>
	<groupId>com.github.danielwegener</groupId>
    <artifactId>logback-kafka-appender</artifactId>
    <version>0.2.0-RC2</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 配置logback-spring.xml:

SpringBoot加载顺序: logback-spring.xml> logback-spring.groovy> logback.xml> logback.groovy

<?xml version="1.0" encoding="UTF-8"?>
<configuration  scan="true" scanPeriod="60 seconds" debug="false">
    <contextName>logback</contextName>
    <!--定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径-->
    <property name="LOG_HOME" value="/data/logs" />
    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
        <topic>applog</topic>
        <!-- we don't care how the log messages will be partitioned  -->
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />

        <!-- use async delivery. the application threads are not blocked by logging -->
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />

        <!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
        <!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
        <!-- bootstrap.servers is the only mandatory producerConfig -->
        <producerConfig>bootstrap.servers=localhost:9092</producerConfig>
        <!-- don't wait for a broker to ack the reception of a batch.  -->
        <producerConfig>acks=0</producerConfig>
        <!-- wait up to 1000ms and collect log messages before sending them as a batch -->
        <producerConfig>linger.ms=1000</producerConfig>
        <!-- even if the producer buffer runs full, do not block the application but start to drop messages -->
        <producerConfig>max.block.ms=0</producerConfig>
        <!-- define a client-id that you use to identify yourself against the kafka broker -->
        <producerConfig>client.id=${HOSTNAME}-${CONTEXT_NAME}-logback-relaxed</producerConfig>
    </appender>

    <root level="info">
        <appender-ref ref="console" />
        <appender-ref ref="kafkaAppender" />
    </root>
</configuration>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

logstash配置启动

ELK的安装使用可以参考 ELK安装使用

  • 配置kafka接收数据,输出到es, index是 test-kafka
input {
     kafka {
        topics => "applog"
        bootstrap_servers => "localhost:9092"
        group_id => "es"
    }
}
output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "test-kafka"
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 启动
    ./bin/logstash -f test-kafka.conf

elasticsearch启动

./bin/elasticsearch

kibana

./bin/kibana

验证

  • 启动程序输出日志
@Slf4j
@SpringBootApplication
public class LogKafkaApplication {

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(LogKafkaApplication.class, args);

        while (true) {
            Thread.sleep(5000);
            log.info("log to kafka...");
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

访问 http://127.0.0.1:5601, 在test-kafka下出现了日志,如图
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/764636
推荐阅读
相关标签
  

闽ICP备14008679号