当前位置:   article > 正文

SpringBoot整合ELK8.1.x实现日志中心教程

SpringBoot整合ELK8.1.x实现日志中心教程

目录

背景

环境准备

环境安装

1.JDK安装

2.安装Elasticsearch

3.安装zookeeper

4.安装Kafka

5.安装logstash

6.安装file beat

解决方案场景

1.日志采集

1.1 应用日志配置

1.1.1 创建logback-spring.xml文件

1.1.2 创建LoggerFactory

1.1.3 trace日志的记录用法

1.1.4 dap-trace日志的记录用法

1.2 应用日志采集

2.日志缓冲

3.日志分析

4.日志存储

5.小总结

6.日志查看


背景

最近公司需要把产品中用到的组件做一下升级,从6.x升级到8.x,本人负责做升级,搞完之后一顿空虚,感觉总少点什么没完成,没错,忘记写点什么了,因此记录下从应用产生日志、filebeat收集日志、发送日志到kafka、logstash完成对日志的结构整理、最终入库es完整链路的方案以及一些配置的说明,希望能帮助到有需求的童鞋们,如果是入门的童鞋,照着本教程一步一步来的话,不出意外的话肯定是能手搓出一套ELK的日志中心的方案出来的;整理不易,望诸君高台贵手,点赞支持。

环境准备

应用:Springboot、Springcloud应用,版本不限

jvm:1.8、17,因为从es8.0.x之后就不支持jdk1.8.x了,本文选的版本是8.1.x,所以需要准备jdk17的环境,但是有些组件还是不能够支持17,因此还需要1.8的环境,具体支持下面会做详细介绍

filebeat、logstash、elasticsearch:8.1.2 

filebeat下载地址:Past Releases of Elastic Stack Software | Elasticicon-default.png?t=N7T8https://www.elastic.co/cn/downloads/past-releases#filebeat

logstash下载地址:Past Releases of Elastic Stack Software | Elasticicon-default.png?t=N7T8https://www.elastic.co/downloads/past-releases#logstash

es下载地址:Past Releases of Elastic Stack Software | Elasticicon-default.png?t=N7T8https://www.elastic.co/downloads/past-releases#elasticsearch

kafka:2.5.1;filebeat8.1.2支持的版本是0.8.2.0 and 2.6.0之间的所有版本,因此选择的版本是2.5.1,也就是2.6.0之前的最后一个release版本,具体支持的详细说明请移步官网介绍:Configure the Kafka output | Filebeat Reference [8.12] | Elastic

下载地址:Apache Kafka

zookeeper:2.5.8,kafka2.5.1对应的zk版本就是2.5.8,如何通过kafka确定zk的对应版本,本人有个方式:去官网下载Kafka的源码:以-src.tgz结尾的就是源码,下载完之后解压,打开解压目录下的/gradle目录,然后打开dependencies.gradle文件,然后搜索zookeeper,你就会看到有个配置如下:"zookeeper":"2.5.8" 这个就是当前kafka版本对应的zk的版本。

zookeeper下载地址:Index of /dist/zookeeper (apache.org)  注意:下载-bin.tar.gz结尾的文件,这个是编译好的介质,不带-bin的是源码,需要自己编译。

sky walking:9.3.0 为什么选择这个版本,因为从9.3.0开始才支持es8.x,之前的版本都只能支持最高到7.x,因此选择这个版本,当然也可以选择更高的版本,本文选择的是9.3.0版本。

下载地址:Downloads | Apache SkyWalking

自此所需要的介质以及对应版本都ok了,接下来就开始安装服务了,先从es开始安装,因为它是最底层环节,别人都需要靠它才能完成最终安装。

环境安装

1.JDK安装

官网下载jdk17,小版本随意,下载完之后上传至服务器,或者直接用wget命令也行,随意

解压完之后,按照如下命令进行全局变量配置

  1. vim /etc/profile
  2. export JAVA_HOME=/usr/lib/jdk17/jdk-17.0.10
  3. export CLASSPATH=.:$JAVA_HOME/lib/
  4. export PATH=.:$JAVA_HOME/bin:$PATH
  5. # 生效
  6. source /etc/profile

2.安装Elasticsearch

先新建一个用户(出于安全考虑,elasticsearch默认不允许以root账号运行)

  1. # 创建组
  2. groupadd es
  3. # 创建用户
  4. useradd -m -g es es
  5. # 设置密码 如果提示密码过于简单或少于8个字符则可将密码设置的复杂一点
  6. passwd es

新建目录/opt/elk/es,然后上传介质并解压到指定目录 tar -zxvf elasticsearch-8.1.2-linux-x86_64.tar.gz -C /opt/elk/es

解压后目录结构如下:

修改Elasticsearch配置文件 文件路径{安装位置}/config/elasticsearch.yml,8.1.2默认是开启了ssl的,所以如果关闭则需要在配置文件中关闭,具体配置如下:

  1. xpack.security.enabled: false
  2. xpack.security.transport.ssl.enabled: false
  3. xpack.security.http.ssl.enabled: false
  4. # 修改es的ip地址
  5. network.host:{安装机器IP}
  6. node.name: node-1
  7. cluster.initial_master_nodes: ["node-1"]

如果需要开启ssl则只需要配置下面三个配置即可,接下来就是JVM参数的调优,因为es比较吃cpu和内存,如果你的机器够大,建议设置一下jvm的参数,在{安装位置}/config/jvm.options文件中,增加:

  1. -Xms10g
  2. -Xmx10g

启动Elasticsearch

  1. # 进入bin目录
  2. cd {安装位置}/bin
  3. # 启动Elasticsearch
  4. ./elasticsearch -d

验证:访问ip:9200 看到如下返回,则启动成功,到此es初步安装完成,后面等所有环境安装完成之后还会根据业务场景对ELK各组件进行设置调整

3.安装zookeeper

注:2.5.8版本的zk还不支持jdk17,因此zk以及后面的kafka都不能安装在jdk17的环境下,本人选择的是jdk1.8,因此童鞋们注意下。

tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz 解压介质,解压完之后是这样

解压完之后进入到{解压目录}/conf目录 

  1. cd {安装目录}/conf
  2. #修改conf目录下的zoo_sample.cfg 改成zoo.cfg
  3. mv zoo_sample.cfg zoo.cfg
  4. #修改dataDir配置为自定义目录
  5. vi zoo.cfg

启动zookeeper,进入zookeeper/bin目录 

我的建议是不要配置环境变量,因为zookeeper本来就是个特定场景下的组件,感觉没什么必要配置全局环境变量,我看很多博主都是直接告诉人家配置全部变量,没必要

  1. # 启动
  2. sh zkServer.sh start
  3. # 查看状态
  4. sh zkServer.sh status
  5. #停止
  6. sh zkServer.sh stop

执行查看状态,能看到下面信息则表示启动成功

Mode: standalone

4.安装Kafka

Kafka一样,2.5.1同样不支持jdk17(3.1.0开始才支持jdk17),本文选择的是jdk1.8

tar -zxvf kafka_2.12-2.5.1.tgz 解压完之后,进行配置修改

  1. # 编辑{安装位置}/config/server.properties文件
  2. # 修改server.properties文件中以下内容,配置kafka监听端口及ip地址
  3. listeners=PLAINTEXT://{安装机器IP}:9092
  4. advertised.listeners=PLAINTEXT://{安装机器IP}:9092
  5. # 修改zookeeper地址
  6. zookeeper.connect=127.0.0.1:2181/kafka
  7. #修改日志文件路径
  8. log.dirs=/tmp/kafka-logs

启动kafka

  1. #进入kafka/bin目录,执行以下命令
  2. nohup sh kafka-server-start.sh -daemon ../config/server.properties >>/dev/null 2>&1 &

5.安装logstash

注意,8.1.x版本的logstash需jdk17的环节;直接解压介质

tar -zxvf logstash-8.1.2-linux-x86_64.tar.gz 

到此安装已经完成了,哈哈,当然是开玩笑的,如果仅仅是安装,确实已经完成了,但是离我们想要的logstash还差一步:配置,具体配置请参考下面的日志分析章节,这里就不重复介绍了

6.安装file beat

file beat同logstash一样,这里暂时跳过,因为filebeat的配置要紧密结合业务来做对应配置的,所以等后面节合场景来具体说明

解决方案场景

本文所解决的业务场景就是:采集各应用产生的本地日志数据,进行汇总,可以进行应用日志查看,检索,链路分析,大致分为这几个步骤:1.日志采集;2.日志缓冲;3.分析日志;4.日志存储;5.日志查看。

1.日志采集

这章节的重点不是讲如何开发一个spring boot应用,而且从实际场景出发,一个真实的应用所产生的不同类型的日志是如何被file beat采集到的,从而完成日志中心的第一步:日志采集。

1.1 应用日志配置

在实际应用开发中,一个应用所产生的日志可能根据业务需求或者场景会分为很多种,需要记录的日志也随之有很多种,比如:应用程序本身日志-sys.log,应用的跟踪日志-trace.log,应用的sql日志-sql.log等等,这些日志根据类型不同所记录的日志内容肯定也是不一样的,那我们首先要做的第一步就是制定这些日志的格式,以便我们可以根据日志类型在后续logstash里面通过一系列filter生成不通的message和属性,最终生成不同的es索引存储在es中,方便我们做查询,当前这些都是后话,我们先把第一步搞定。

在这里,我们就假设应用需要生成3种类型日志,分别是:

app-sys.log:应用系统日志,记录程序本身的一些日志,包括spring框架、正常代码中自己记录的一些logger、异常等等信息

app-trace.log:应用内部service之间调用关系跟踪日志,记录service内部method的调用、出参入参等,方便查看应用本身的调用逻辑,以便能快速定位问题

app-dap-trace.log:微服务之间的调用日志,记录每个应用之间的调用,以便能查看一个业务场景的整个调用链,能快速知悉业务具体涉及的应用等。

根据这三种类型日志,那么就需要设计对应的日志记录的信息结构,那么我就根据日志的不通性质来定义一些通用的一些属性。当然,在实际开发中,每个人每个公司都有自己的业务,肯定跟着自己的业务来定,我这里只是做一个demo仅供参考。

既然设计已经初步完成了,那么接下来就是怎么实现了;在spring里面通常大家都是用logback来作为日志框架,那么我们就用logback来举例如何生成这三种类型的日志。

1.1.1 创建logback-spring.xml文件

logback文件主要是用来定义每个日志的结构、策略等等,在这里我就不对log back文件做详细的标签介绍了,以下logback-spring.xml 可直接复制即用,只需要修改里面的某些东西即可,具体见如下代码:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration>
  3. <!-- Spring 自身日志配置 对应APP-SYS -->
  4. <springProperty scope="context" name="appCode" source="spring.application.name" defaultValue=""/>
  5. <springProperty scope="context" name="appInstId" source="application.app-inst-id" defaultValue=""/>
  6. <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/>
  7. <conversionRule conversionWord="wex"
  8. converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/>
  9. <conversionRule conversionWord="wEx"
  10. converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/>
  11. <property name="APP_LOG_PATTERN_CONSOLE"
  12. value="${APP_LOG_PATTERN_CONSOLE:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} %5p [${appCode:-},${appInstId:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-B3-ParentSpanId:-}] ${PID:- } --- [%t] %-40.40logger{39} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
  13. <property name="APP_LOG_PATTERN_FILE"
  14. value="${APP_LOG_PATTERN_FILE:-%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} %5p [${appCode:-},${appInstId:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-B3-ParentSpanId:-}] ${PID:- } --- [%t] %-40.40logger{39} : %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>
  15. <property name="APP_LOG_LEVEL" value="${APP_LOG_LEVEL:-INFO}"/>
  16. <property name="APP_LOG_FILE_MAX_SIZE" value="${APP_LOG_FILE_MAX_SIZE:-100MB}"/>
  17. <property name="APP_LOG_FILE_MAX_HISTORY" value="${APP_LOG_FILE_MAX_HISTORY:-100}"/>
  18. <property name="APP_LOG_FILE_TOTAL_SIZE" value="${APP_LOG_FILE_TOTAL_SIZE:-0}"/>
  19. <property name="APP_LOG_FILE"
  20. value="./logs/app-sys.log}"/>
  21. <!--配置APP-SYS到控制台-->
  22. <appender name="APP_LOG_APPENDER_CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
  23. <encoder>
  24. <pattern>${APP_LOG_PATTERN_CONSOLE}</pattern>
  25. </encoder>
  26. </appender>
  27. <!--配置APP-SYS到文件-->
  28. <appender name="APP_LOG_APPENDER_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
  29. <encoder>
  30. <pattern>${APP_LOG_PATTERN_FILE}</pattern>
  31. </encoder>
  32. <file>${APP_LOG_FILE}</file>
  33. <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
  34. <fileNamePattern>${APP_LOG_FILE}.%d{yyyy-MM-dd}.%i</fileNamePattern>
  35. <maxFileSize>${APP_LOG_FILE_MAX_SIZE}</maxFileSize>
  36. <maxHistory>${APP_LOG_FILE_MAX_HISTORY}</maxHistory>
  37. <totalSizeCap>${APP_LOG_FILE_TOTAL_SIZE}</totalSizeCap>
  38. </rollingPolicy>
  39. </appender>
  40. <!-- 复制自 spring boot org/springframework/boot/logging/logback/defaults.xml -->
  41. <logger name="org.apache.catalina.startup.DigesterFactory" level="ERROR"/>
  42. <logger name="org.apache.catalina.util.LifecycleBase" level="ERROR"/>
  43. <logger name="org.apache.coyote.http11.Http11NioProtocol" level="WARN"/>
  44. <logger name="org.apache.sshd.common.util.SecurityUtils" level="WARN"/>
  45. <logger name="org.apache.tomcat.util.net.NioSelectorPool" level="WARN"/>
  46. <logger name="org.eclipse.jetty.util.component.AbstractLifeCycle" level="ERROR"/>
  47. <logger name="org.hibernate.validator.internal.util.Version" level="WARN"/>
  48. <!-- APP-TRACE 日志配置 -->
  49. <property name="APP_TRACE_LOG_PATTERN"
  50. value="[${appCode:-},${appInstId:-},%X{X-B3-TraceId:-}] [%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}}] %m%n"/>
  51. <property name="APP_TRACE_LOG_FILE" value="./logs/app-trace.log}"/>
  52. <property name="APP_TRACE_LOG_LEVEL" value="${APP_TRACE_LOG_LEVEL:-INFO}"/>
  53. <!--APP-TRACE to file-->
  54. <appender name="APP_TRACE_LOG_APPENDER_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
  55. <encoder>
  56. <pattern>${APP_TRACE_LOG_PATTERN}</pattern>
  57. <charset>UTF-8</charset>
  58. </encoder>
  59. <file>${APP_TRACE_LOG_FILE}</file>
  60. <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
  61. <fileNamePattern>${APP_TRACE_LOG_FILE}.%d{yyyy-MM-dd}.%i</fileNamePattern>
  62. <maxFileSize>${APP_TRACE_LOG_FILE_MAX_SIZE:-100MB}</maxFileSize>
  63. <maxHistory>${APP_TRACE_LOG_FILE_MAX_HISTORY:-100}</maxHistory>
  64. <totalSizeCap>${APP_TRACE_LOG_FILE_TOTAL_SIZE:-0}</totalSizeCap>
  65. </rollingPolicy>
  66. </appender>
  67. <!--DAP-TRACE 日志配置-->
  68. <conversionRule conversionWord="dapTraceJsonMsg"
  69. converterClass="com.testlog.wx.testspringlog.LogJSONMessageConverter"/>
  70. <property name="DAP_TRACE_LOG_FILE"
  71. value="./logs/app-dap-trace.log}"/>
  72. <property name="DAP_TRACE_LOG_LEVEL" value="${DAP_TRACE_LOG_LEVEL:-INFO}"/>
  73. <property name="DAP_TRACE_LOG_FILE_MAX_SIZE" value="${DAP_TRACE_LOG_FILE_MAX_SIZE:-100MB}"/>
  74. <property name="DAP_TRACE_LOG_FILE_MAX_HISTORY" value="${DAP_TRACE_LOG_FILE_MAX_HISTORY:-100}"/>
  75. <property name="DAP_TRACE_LOG_FILE_TOTAL_SIZE" value="${DAP_TRACE_LOG_FILE_TOTAL_SIZE:-0}"/>
  76. <!--DAP-TRACE to console-->
  77. <appender name="DAP_TRACE_LOG_APPENDER_CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
  78. <encoder>
  79. <pattern>%dapTraceJsonMsg</pattern>
  80. <charset>UTF-8</charset>
  81. </encoder>
  82. </appender>
  83. <!--DAP-TRACE to file-->
  84. <appender name="DAP_TRACE_LOG_APPENDER_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
  85. <encoder>
  86. <pattern>%dapTraceJsonMsg</pattern>
  87. <charset>UTF-8</charset>
  88. </encoder>
  89. <file>${DAP_TRACE_LOG_FILE}</file>
  90. <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
  91. <fileNamePattern>${DAP_TRACE_LOG_FILE}.%d{yyyy-MM-dd}.%i</fileNamePattern>
  92. <maxFileSize>${DAP_TRACE_LOG_FILE_MAX_SIZE}</maxFileSize>
  93. <maxHistory>${DAP_TRACE_LOG_FILE_MAX_HISTORY}</maxHistory>
  94. <totalSizeCap>${DAP_TRACE_LOG_FILE_TOTAL_SIZE}</totalSizeCap>
  95. </rollingPolicy>
  96. </appender>
  97. <!--日志开关-->
  98. <root level="${APP_LOG_LEVEL}">
  99. <appender-ref ref="APP_LOG_APPENDER_CONSOLE"/>
  100. <appender-ref ref="APP_LOG_APPENDER_FILE"/>
  101. </root>
  102. <logger name="app-trace" additivity="false" level="${APP_TRACE_LOG_LEVEL}">
  103. <appender-ref ref="APP_TRACE_LOG_APPENDER_FILE"/>
  104. </logger>
  105. <logger name="app-dap-trace" additivity="false" level="${DAP_TRACE_LOG_LEVEL}">
  106. <appender-ref ref="DAP_TRACE_LOG_APPENDER_CONSOLE"/>
  107. <appender-ref ref="DAP_TRACE_LOG_APPENDER_FILE"/>
  108. </logger>
  109. </configuration>
1.1.2 创建LoggerFactory

LoggerFactory的作用是为了给程序员更方便的创建不通的logger,我们在这里就封装app-trace和dap-trace logger的创建,app-sys用系统日志即可,无需额外创建,具体代码如下:

  1. package com.testlog.wx.testspringlog;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. /**
  5. * @Description TODO
  6. * @Author wx
  7. * @Date 2024/3/28
  8. */
  9. public class DemoTraceLoggerFactory {
  10. public static final String TRACE_LOGGER_NAME_PREFIX = "app-trace.";
  11. public static final String DAP_TRACE_LOGGER_NAME_PREFIX = "app-dap-trace.";
  12. public static Logger getTraceLogger(Class<?> clazz) {
  13. return LoggerFactory.getLogger(TRACE_LOGGER_NAME_PREFIX + clazz.getName());
  14. }
  15. public static Logger getDapTraceLogger(Class<?> clazz) {
  16. return LoggerFactory.getLogger(DAP_TRACE_LOGGER_NAME_PREFIX + clazz.getName());
  17. }
  18. }
1.1.3 trace日志的记录用法

app-trace的作用是用来记录应用内部不同service.method的调用链,因此我们编写一个demo,两个service来做一个演示具体app-trace的记录效果;

首先新建 IDemoService 以及实现类 DemoServiceImpl

IDemoService:

  1. package com.testlog.wx.testspringlog;
  2. /**
  3. * @Description TODO
  4. * @Author wx520
  5. * @Date 2024/3/28
  6. */
  7. public interface IDemoService {
  8. String sayHi(String name);
  9. }

DemoServiceImpl:

  1. package com.testlog.wx.testspringlog;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @Description TODO
  8. * @Author wx
  9. * @Date 2024/3/28
  10. */
  11. @Service
  12. public class DemoServiceImpl implements IDemoService {
  13. @Autowired
  14. private IDemoService2 demoService2;
  15. @Override
  16. public String sayHi(String name) {
  17. System.out.println(name);
  18. demoService2.test11();
  19. return "hello word";
  20. }
  21. }

再新建IDemoService2以及实现类DemoServiceImpl2

IDemoService2:

  1. package com.testlog.wx.testspringlog;
  2. /**
  3. * @Description TODO
  4. * @Author wx520
  5. * @Date 2024/3/28
  6. */
  7. public interface IDemoService2 {
  8. void test11();
  9. }

DemoServiceImpl2:

  1. package com.testlog.wx.testspringlog;
  2. import org.springframework.stereotype.Service;
  3. /**
  4. * @Description TODO
  5. * @Author wx
  6. * @Date 2024/3/28
  7. */
  8. @Service
  9. public class DemoServiceImpl2 implements IDemoService2 {
  10. @Override
  11. public void test11() {
  12. System.out.println("test");
  13. }
  14. }

建完service之后那么就开始写记录trace日志的逻辑,我们用aspect来对@Service做拦截,用@Aroud来进行记录请求在进入method之前和之后的日志信息,具体实现如下:

  1. package com.testlog.wx.testspringlog;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.aspectj.lang.ProceedingJoinPoint;
  4. import org.aspectj.lang.annotation.Around;
  5. import org.aspectj.lang.annotation.Aspect;
  6. import org.aspectj.lang.reflect.MethodSignature;
  7. import org.slf4j.Logger;
  8. import org.slf4j.MDC;
  9. import org.springframework.stereotype.Component;
  10. import java.util.UUID;
  11. /**
  12. * @Description TODO
  13. * @Author wx
  14. * @Date 2024/3/28
  15. */
  16. @Aspect
  17. @Component
  18. public class SpringTraceLoggerMethodInterceptor {
  19. private Logger traceLogger = DemoTraceLoggerFactory.getTraceLogger(getClass());
  20. @Around("@within(org.springframework.stereotype.Service)")
  21. public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
  22. Class clazz = joinPoint.getTarget().getClass();
  23. String beanClassName = clazz.getName();
  24. MethodSignature signature = (MethodSignature) joinPoint.getSignature();
  25. String methodName = signature.getName();
  26. String message = "Spring Bean " + beanClassName + "#" + methodName;
  27. String[] argNames = signature.getParameterNames();
  28. Object[] parameters = joinPoint.getArgs();
  29. traceLogger.info(entry(message, buildParams(argNames, parameters)));
  30. Object result = joinPoint.proceed();
  31. traceLogger.info(exit(message, buildResult(result)));
  32. return result;
  33. }
  34. private String buildResult(Object result) {
  35. StringBuffer sb = new StringBuffer();
  36. sb.append("{");
  37. sb.append("result:").append(JsonUtil.toJson(result));
  38. sb.append("}");
  39. return sb.toString();
  40. }
  41. private String buildParams(String[] argNames, Object[] parameters) {
  42. StringBuffer sb = new StringBuffer();
  43. sb.append("{");
  44. for (int i = 0; i < argNames.length; i++) {
  45. sb.append("" + argNames[i] + ":").append("" + JsonUtil.toJson(parameters[i]) + "");
  46. if (i != argNames.length - 1) sb.append(",");
  47. }
  48. sb.append("}");
  49. return sb.toString();
  50. }
  51. /**
  52. * 测试代码 写的不够严谨 不能作为正式开发使用
  53. *
  54. * @param message
  55. */
  56. private String entry(String message, String params) {
  57. String traceId = MDC.get("X-B3-TraceId");
  58. if (StringUtils.isEmpty(traceId)) {
  59. MDC.put("X-B3-TraceId", UUID.randomUUID().toString());
  60. }
  61. return getFormatSysLoggerMessages(message, params, " invoker start.", "Begin");
  62. }
  63. private String exit(String message, String params) {
  64. return getFormatSysLoggerMessages(message, params, " invoker end.", "End");
  65. }
  66. /**
  67. * 测试代码 写的不够严谨 不能作为正式开发使用
  68. *
  69. * @param message
  70. * @param params
  71. * @param action
  72. * @return
  73. */
  74. private String getFormatSysLoggerMessages(String message, String params, String action, String loggerType) {
  75. StringBuffer sb = new StringBuffer();
  76. sb.append("[");
  77. sb.append(loggerType).append("]");
  78. sb.append("[").append(message).append("]");
  79. sb.append("[").append(params).append("]");
  80. sb.append("[Spring Bean ").append(message).append(action).append("]");
  81. return sb.toString();
  82. }
  83. }

最后建一个controller用来做远程调用入口

  1. package com.testlog.wx.testspringlog;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.GetMapping;
  4. import org.springframework.web.bind.annotation.PathVariable;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import static org.springframework.http.MediaType.ALL_VALUE;
  8. /**
  9. * @Description TODO
  10. * @Author wx
  11. * @Date 2024/3/28
  12. */
  13. @RestController
  14. @RequestMapping(value = "/demo", consumes = ALL_VALUE)
  15. public class DemoController {
  16. @Autowired
  17. private IDemoService demoService;
  18. @GetMapping("/sayHi/{name}")
  19. public String sayHi(@PathVariable("name") String name) {
  20. return demoService.sayHi(name);
  21. }
  22. }

效果:

开发工具直接启动应用,我们会发现控制台输入的日志信息就跟我们在logback-spring.xml里面配置的app-sys日志格式是一样的,同时,在当前应用的根目录/logs目录下会生成一个app-sys.log的日志文件;

app-sys.log 控制台效果如下:

app-sys.log 文件内容效果如下:

那么我们接下来浏览器直接访问controller接口或者用其他工具都可,访问成功之后会在/logs目录生成app-trace.log日志文件,具体内容如下:

  1. [test-spring-log,,e00cc8a5-fcc0-46d5-b694-eccc8e129bc7] [2024-03-28 14:41:50.636] [Begin][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi][{name:"aa"}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi invoker start.]
  2. [test-spring-log,,e00cc8a5-fcc0-46d5-b694-eccc8e129bc7] [2024-03-28 14:41:50.642] [Begin][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11][{}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11 invoker start.]
  3. [test-spring-log,,e00cc8a5-fcc0-46d5-b694-eccc8e129bc7] [2024-03-28 14:41:50.645] [End][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11][{result:null}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11 invoker end.]
  4. [test-spring-log,,e00cc8a5-fcc0-46d5-b694-eccc8e129bc7] [2024-03-28 14:41:50.645] [End][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi][{result:"hello word"}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi invoker end.]
  5. [test-spring-log,,daa999c2-49d1-40e0-83c6-b33781926185] [2024-03-28 14:51:33.048] [Begin][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi][{name:"aa"}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi invoker start.]
  6. [test-spring-log,,daa999c2-49d1-40e0-83c6-b33781926185] [2024-03-28 14:51:33.055] [Begin][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11][{}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11 invoker start.]
  7. [test-spring-log,,daa999c2-49d1-40e0-83c6-b33781926185] [2024-03-28 14:51:33.059] [End][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11][{result:null}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl2#test11 invoker end.]
  8. [test-spring-log,,daa999c2-49d1-40e0-83c6-b33781926185] [2024-03-28 14:51:33.059] [End][Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi][{result:"hello word"}][Spring Bean Spring Bean com.testlog.wx.testspringlog.DemoServiceImpl#sayHi invoker end.]

你会发现我们日志中一个完整的traceId包含四条日志信息:service1.sayHi begin---》service2.test11 begin ---->service2.tes11 end ---> service1.sayHi end;这就是app-trace日志记录的一个请求的一条完成trace链路。

1.1.4 dap-trace日志的记录用法

dap-trace比较特殊,因为它的作用是用来记录每个微服务实例之间的调用链路的,本来考虑这里加入sky walking一起讲的,但是感觉这样篇幅就收不住了就打住了,后期有时间会整理一套应用节合sky walking以及节合业务属性来做追踪的方案,因此这里就直接使用sleuth来做trace跟踪,

还有一个需要注意的地方就是:前面两种日志都是自定义的常规格式,因此dap-trace日志楼主就想用个特殊格式-json 来做记录,这样也方便大家有这个需求:怎么用logback记录json格式的日志,什么格式这里都有,无需再去查找资料了

pom文件添加sleuth依赖:版本自定 无需跟本文一致

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-sleuth</artifactId>
  4. <version>3.1.7</version>
  5. </dependency>

既然是记录dap-trace日志,那么肯定是要在请求进来应用后以及响应之前做日志记录,那么楼主推荐大家使用spring的OncePreRequestFilter来做拦截,这个filter的作用和原理我就不说了,望名知意,具体大家可以去官网看文档介绍,代码如下:

  1. package com.testlog.wx.testspringlog;
  2. import org.slf4j.Logger;
  3. import org.slf4j.MarkerFactory;
  4. import org.springframework.core.Ordered;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.web.filter.OncePerRequestFilter;
  7. import org.springframework.web.servlet.HandlerMapping;
  8. import javax.servlet.FilterChain;
  9. import javax.servlet.ServletException;
  10. import javax.servlet.http.HttpServletRequest;
  11. import javax.servlet.http.HttpServletResponse;
  12. import java.io.IOException;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. import java.util.concurrent.TimeUnit;
  16. /**
  17. * @Description TODO
  18. * @Author wx
  19. * @Date 2024/3/28
  20. */
  21. @Component
  22. public class DapTraceLoggerFilter extends OncePerRequestFilter implements Ordered {
  23. private Logger dapTraceLogger = DemoTraceLoggerFactory.getDapTraceLogger(getClass());
  24. @Override
  25. public int getOrder() {
  26. return HIGHEST_PRECEDENCE + 10;
  27. }
  28. @Override
  29. protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
  30. //这里用来记录dap-trace
  31. long start = System.nanoTime();
  32. logRequest(request);
  33. filterChain.doFilter(request, response);
  34. logResponse(start, response, request);
  35. }
  36. private void logResponse(long start, HttpServletResponse response, HttpServletRequest request) {
  37. Map<String, Object> map = new HashMap<>();
  38. map.put("comp", "SPRING_WEB");
  39. map.put("r_event", "SS");
  40. map.put("hsc", response.getStatus());
  41. map.put("api_path", request.getAttribute(HandlerMapping.BEST_MATCHING_PATTERN_ATTRIBUTE));
  42. map.put("etime", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
  43. dapTraceLogger.info(MarkerFactory.getMarker("//@args-json-formatter"), null, map);
  44. }
  45. private void logRequest(HttpServletRequest request) {
  46. Map<String, Object> map = new HashMap<>();
  47. map.put("comp", "SPRING_WEB");
  48. map.put("r_event", "SR");
  49. map.put("hurl", request.getRequestURL().toString());
  50. map.put("hm", request.getMethod());
  51. map.put("hra", request.getRemoteAddr());
  52. //使用该Marker时, 如果日志参数里有Map类型参数, 则会将Map变成json结构
  53. dapTraceLogger.info(MarkerFactory.getMarker("//@args-json-formatter"), null, map);
  54. }
  55. }

因为是需要json格式的日志,楼主这里就没有用常规方式去配置,也就是在logback里面配置一大堆属性,这个方式我感觉有点不方便,既然是json,那属性肯定都是动态或者自定义的,方便后期扩展,否则每次新增属性都要去修改logback就很麻烦,不利于迭代和维护,具体代码如下:

  1. package com.testlog.wx.testspringlog;
  2. import ch.qos.logback.classic.pattern.ThrowableHandlingConverter;
  3. import ch.qos.logback.classic.spi.ILoggingEvent;
  4. import org.slf4j.MDC;
  5. import org.slf4j.MarkerFactory;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. /**
  11. * @Description TODO
  12. * @Author wx
  13. * @Date 2024/3/28
  14. */
  15. public class LogJSONMessageConverter extends ThrowableHandlingConverter {
  16. private static final ThreadLocal<SimpleDateFormat> SDF = new ThreadLocal<SimpleDateFormat>() {
  17. protected SimpleDateFormat initialValue() {
  18. return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  19. }
  20. };
  21. @Override
  22. public String convert(ILoggingEvent le) {
  23. Map<String, Object> map = new HashMap<>();
  24. String traceId = MDC.get("traceId");
  25. String spanId = MDC.get("spanId");
  26. String psid = MDC.get("parentId");
  27. //skywalking 相关代码暂时去掉 有感兴趣的同学也可以自己打开注释
  28. // String swTraceId = MDC.get("swTraceId");
  29. // if (StringUtils.isEmpty(swTraceId) || StringUtils.equals("N/A", swTraceId)) {
  30. // swTraceId = TraceContext.traceId();
  31. // }
  32. map.put("time", SDF.get().format(new Date(le.getTimeStamp())));
  33. map.put("app", LoggerProperties.INSTANCE.getAppCode());
  34. map.put("tid", traceId);
  35. map.put("sid", spanId);
  36. map.put("psid", psid);
  37. // map.put("sw_tid", swTraceId);
  38. if (le.getMarker() == MarkerFactory.getMarker("//@args-json-formatter")) {
  39. Object[] array = le.getArgumentArray();
  40. if (array != null && array.length > 0) {
  41. for (int i = 0, len = array.length; i < len; i++) {
  42. Object item = array[i];
  43. // 只处理 Map类型 这个根据自己的模型来就是了
  44. if (item instanceof Map) {
  45. for (Map.Entry<?, ?> entry : ((Map<?, ?>) item).entrySet()) {
  46. map.put(String.valueOf(entry.getKey()), entry.getValue());
  47. }
  48. }
  49. }
  50. }
  51. }
  52. return JsonUtil.toJson(map) + System.lineSeparator();
  53. }
  54. }

效果:

同样的,访问刚刚的controller接口,会在控制台以及/logs目录生成app-dap-trace.log日志文件,具体效果如下:

  1. {"app":"test-spring-log","comp":"SPRING_WEB","sw_tid":"29dd2d14d50a4be2ac2042b658b838e6.46.17116086928750001","r_event":"SR","hm":"GET","hurl":"http://localhost:8080/demo/sayHi/aa","hra":"0:0:0:0:0:0:0:1","time":"2024-03-28 14:51:32.907","tid":"60c5cfdc3694eb1b","sid":"60c5cfdc3694eb1b"}
  2. {"app":"test-spring-log","comp":"SPRING_WEB","sw_tid":"29dd2d14d50a4be2ac2042b658b838e6.46.17116086928750001","r_event":"SS","etime":183,"hsc":200,"time":"2024-03-28 14:51:33.089","api_path":"/demo/sayHi/{name}","tid":"60c5cfdc3694eb1b","sid":"60c5cfdc3694eb1b"}

同样的,一次请求会记录SR和SS两条日志,全局跟踪链的话就用tid(也就是traceId)来做跟踪。

自此,应用日志配置已经全部完成,接下来就是日志到底如何采集的。

1.2 应用日志采集

日志采集这块就交给file beat了,但是我们有三套日志文件,想要file beat能识别不同格式的日志以便能正确采集到日志信息,就得需要对应配置好file beat,,进入{安装目录}/,新建test-filebeat.yml配置文件进行配置,具体配置如下:

  1. filebeat.inputs:
  2. # -----
  3. # app-dap-trace-log
  4. # -----
  5. - type: filestream
  6. id: app-dap-trace-log-id
  7. enabled: true
  8. paths:
  9. - /opt/elk/test-logs/app-dap-trace.log*
  10. - /opt/elk/test-logs/app-dap-trace.log*
  11. - /opt/elk/test-logs/app-dap-trace.log*
  12. - /opt/elk/test-logs/app-dap-trace.log*
  13. encoding: UTF-8
  14. fields:
  15. app-topic-name: app-log
  16. app-log-type: app-dap-trace-log
  17. prospector.scanner.exclude_files: [".gz$"]
  18. parsers: #因为我们的dap-trace日志是json格式的,所以需要用的file beat的ndjson来做解析
  19. - ndjson:
  20. target: ""
  21. add_error_key: true
  22. overwrite_keys: true
  23. # -----
  24. # app-sys-log
  25. # -----
  26. - type: filestream
  27. id: app-sys-log-id
  28. enabled: true
  29. paths:
  30. - /opt/elk/test-logs/app-sys.log*
  31. - /opt/elk/test-logs/app-sys.log*
  32. - /opt/elk/test-logs/app-sys.log*
  33. encoding: UTF-8
  34. fields:
  35. app-topic-name: app-log
  36. app-log-type: app-sys-log
  37. prospector.scanner.exclude_files: [".gz$"]
  38. parsers:
  39. - multiline:
  40. type: pattern
  41. pattern: '^\d{4}\-\d{2}\-\d{2}'
  42. negate: true
  43. match: after
  44. max_lines: 500
  45. # -----
  46. # app-trace-log
  47. # -----
  48. - type: filestream
  49. id: app-log-id
  50. enabled: true
  51. paths:
  52. - /opt/elk/test-logs/app-trace.log*
  53. encoding: UTF-8
  54. fields:
  55. app-topic-name: app-log
  56. app-log-type: app-trace-log
  57. prospector.scanner.exclude_files: [".gz$"]
  58. parsers:
  59. - multiline:
  60. type: pattern
  61. pattern: '^\[[^,]+,[^,]+,'
  62. negate: true
  63. match: after
  64. max_lines: 500
  65. # -----
  66. # output
  67. # -----
  68. output.kafka:
  69. enabled: true
  70. hosts: ["localhost:9092"]
  71. topic: '%{[fields.app-topic-name]}'
  72. max_retries: 3
  73. bulk_max_size: 2048
  74. timeout: 30
  75. broker_timeout: 10
  76. channel_buffer_size: 256
  77. keep_alive: 60
  78. compression: gzip
  79. max_message_bytes: 100000000
  80. required_acks: 1
  81. client_id: 'app-filebeats'
  82. # -----
  83. # 8.x版本logstash会用event做字段,冲突 不想改原本的日志结构 这里做一下rename
  84. # -----
  85. processors:
  86. - rename:
  87. fields:
  88. - from: "event"
  89. to: "r_event"
  90. ignore_missing: false
  91. fail_on_error: true

配置解析:

官方文档: Configure inputs | Filebeat Reference [8.12] | Elastic  版本选择8.1.2

scope属性说明
inputs配置typefilestream老版本为log,新版本统一用filestream
id自定义多个filestream需要用id来区分
paths路径配置当前filestream要采集的日志路径,数组格式,可配置多个
fields字段可自定义添加当前采集的数据的字段属性,比如本文中用添加了app-log-type和app-topic-name用来给后续用
parsersndjson解析器 可多配
target解析json的时候放在哪个属性下,为空就是放在跟目录,老版本是keys_under_root
multiline处理多行数据
typepattern正则匹配多行数据
pattern正则表达式匹配上的数据才会采集
outputkafka输出
topic输出到kafka的topic节合业务定义对应的topic,logstash那边读取对应的topic数据,各行其职
processorsrename修改属性字段名称
fields
from原属性
to目标属性

 在这里为什么要单独列一下processors配置呢,因为开头也说了,楼主因为是升级ELK,但是应用日志模板升级的时候你肯定是不能改的,一改,日志结构就没法兼容了,所以大家也有可能会遇到楼主同样的问题,因为之前楼主公司的trace日志里面有个属性叫:event,是String,记录是输入还是输出类型的,但是8.1.x版本的logstash会把event当作它自己的属性,而且还是个Object类型,一升级,就会导致日志在logstash那边没办法识别,从而存进es的时候就会发现存的索引不对,这个时候file beat的这个rename处理就有大作用了,我可以不用修改应用的日志结构从而兼容升级之前的日志数据,而且file beat还有很多其他的处理器,大家在遇到问题的时候建议多去看看官方文档找找,总有解决办法的。

到此,日志采集环节就已经完成了,接下来就是日志缓冲。

2.日志缓冲

这里加上kafka的主要作用是用来做缓冲,不过这一块看具体业务,有些业务量小的,数据没那么夸张就感觉没必要上,那有些业务非常频繁,数据量很大,如果直接从beat到logstash很大概率会讲logstash压垮,或者是导致logstash非常吃资源,这样就有点得不偿失。

说白了,日志数据并不像其他业务数据一样需要时效性以及100%准确性,只需要在特定的时间完成处理即可,所以kafka就派上用场了。

在这一环节,大家回过头来看上面file beat的配置,大家会发现我们配置了一个自定义的fields:app-topic-name,所以在kafka这里我们也要将这个topic创建出来,具体命令如下:

  1. #进入到kafka安装目录/bin目录下执行命令
  2. ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181/kafka --replication-factor 1 --partitions 1 --topic app-log

3.日志分析

现在整个ELK环节从file beat采集--->kafka缓冲--->已经来到了logstash分析整理了,logstash在这里的主要作用就是通过读取kafka推送过来的日志数据,然后分析日志结构,整理成我们和es都想要的格式存进es,进入到{安装目录}/config目录,新建一个test-logstash.conf文件,然后进行配置,具体配置如下:

  1. input {
  2. kafka {
  3. bootstrap_servers => ["127.0.0.1:9092"]
  4. topics_pattern => "app-.*"
  5. group_id => "app-logstash"
  6. client_id => "app-logstash"
  7. auto_offset_reset => "earliest"
  8. codec => json {
  9. charset => "UTF-8"
  10. }
  11. consumer_threads => 3
  12. decorate_events => true
  13. }
  14. }
  15. filter {
  16. # -----
  17. # pre process [fields][app-log-type]
  18. # -----
  19. if ![fields][app-log-type] and [log][file][path] {
  20. grok {
  21. match => { "[log][file][path]" => "%{GREEDYDATA}(\\|\/)%{DATA:log-file-name}\."}
  22. }
  23. mutate {
  24. add_field => { "[fields][app-log-type]" => "%{[log-file-name]}-log" }
  25. }
  26. }
  27. # -----
  28. # parse log message as json field
  29. # -----
  30. # app-sys-log is log generated by app8
  31. if [fields][app-log-type] =~ "app-sys-log" {
  32. grok {
  33. match => { "message" => "%{TIMESTAMP_ISO8601:time}\s+%{LOGLEVEL:level}\s+\[%{DATA:app},%{DATA:inst},%{DATA:tid},%{DATA:sid},%{DATA:psid}\]%{GREEDYDATA:msg}" }
  34. }
  35. }
  36. else if [fields][app-log-type] =~ "app-trace-log" {
  37. grok {
  38. match => { "message" => "\[%{DATA:app},%{DATA:inst},%{DATA:tid}\]\[%{TIMESTAMP_ISO8601:time}\]%{GREEDYDATA:msg}" }
  39. }
  40. }
  41. # -----
  42. # replace logstash @timestamp with log timestamp, and output.elasticsearch create index use this time
  43. # -----
  44. date {
  45. match => [ "time", "yyyy-MM-dd HH:mm:ss.SSS", "ISO8601" ]
  46. locale => "cn"
  47. timezone => "Asia/Shanghai"
  48. }
  49. }
  50. output {
  51. elasticsearch {
  52. hosts => ["127.0.0.1:9200"]
  53. index => "%{[fields][app-log-type]}-%{+YYYY.MM.dd}"
  54. }
  55. }

这里为什么只做了两种日志type的分析,因为dap-trace本身就是json格式的,无需再处理了,而其他两种的日志为什么不也生成json格式的呢,这就涉及到一个开发习惯了,相信绝大数人看日志的话肯定还是习惯于像spring框架产生的那种格式的日志,因为大多数人第一次接触开发,从成功启动第一个应用到看到控制台输出日志,看到的日志就是类似 

2024-03-28 10:21:38.803  INFO [test-spring-log,,,,] 20792 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 8080 (http)

这种格式的,这种格式日志也方便大家查看信息定位问题,尤其是堆栈信息,而dap-trace日志不同,它是记录应用之间的调用链的,一般这种日志都是通过第三方工具或者节合业务需求来查看的,所以一般都是记录的json格式数据。

在这里做日志分析的目的就是从日志数据中解析出我们想要的属性,这样存进es的时候方便后续通过某些自定义的一些业务属性进行条件查询,比如,按照日志记录时间查询某时间段的sys日志;比如按照应用appCode来查找不同应用的sys日志,如果你只是一股脑把数据放进es,查是能查,查出来的数据就不一定是你想要的了,就不贴合业务场景了

这里我就不过多介绍logstash配置文件里的那些配置具体作用了,感兴趣的同学可去官方文档查看: Logstash Reference [8.13] | Elastic 

4.日志存储

这一环节有两个方面需要思考:

1.怎么存

2.怎么查

在上一个环节:日志分析章节我们可以看到,我们根据日志的类型去单独解析出具体日子内容,然后解析出来我们需要的属性,然后在根据日志生成的时间+日志类型作为es的索引构成一个完整的日志索引模型,那这个环节我们要做的就是匹配logstash过来的数据,然后按照我们设计好的日志索引模板来存储索引数据,所以我们首先要做的就是:制定好对应的日志类型的索引模板;那么索引模板定好了,所有的数据都会按照我们设计的模型存储到es中,查自然就不在话下了。

针对我们设计的三种日志类型我分别设计了三个对应的日志索引模板,具体如下:

app-sys、app-trace日志索引模板:

  1. {
  2. "index_patterns": ["*app-trace-log*","*app-sys-log*"],
  3. "priority": 1,
  4. "template": {
  5. "mappings": {
  6. "properties": {
  7. "time": {
  8. "type": "date",
  9. "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
  10. },
  11. "app": {
  12. "type": "keyword"
  13. },
  14. "inst": {
  15. "type": "keyword"
  16. },
  17. "tid": {
  18. "type": "keyword"
  19. },
  20. "sid": {
  21. "type": "keyword"
  22. },
  23. "psid": {
  24. "type": "keyword"
  25. }
  26. }
  27. }
  28. }
  29. }

app-dap-trace日志索引模板:

  1. {
  2. "index_patterns": ["*app-dap-trace-log*"],
  3. "priority": 3,
  4. "template": {
  5. "mappings": {
  6. "properties": {
  7. "time": {
  8. "type": "date",
  9. "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
  10. },
  11. "app": {
  12. "type": "keyword"
  13. },
  14. "inst": {
  15. "type": "keyword"
  16. },
  17. "tid": {
  18. "type": "keyword"
  19. },
  20. "sid": {
  21. "type": "keyword"
  22. },
  23. "psid": {
  24. "type": "keyword"
  25. },
  26. "comp": {
  27. "type": "keyword"
  28. },
  29. "r_event": {
  30. "type": "keyword"
  31. },
  32. "sw_tid": {
  33. "type": "keyword"
  34. },
  35. "api_path": {
  36. "type": "keyword"
  37. }
  38. }
  39. }
  40. }
  41. }

接下来就是让日志模板在es生效,将上述模板分别保存为app-sys-trace-log-template.json、app-dap-trace-log-template.json文件,然后将文件上传至es安装目录,然后执行如下命令:

  1. curl -H "Content-Type:application/json" -XPUT 'http://ip:9200/_index_template/app-sys-trace-log' -d @app-sys-trace-log-template.json
  2. curl -H "Content-Type:application/json" -XPUT 'http://ip:9200/_index_template/app-dap-trace-log' -d @app-dap-trace-log-template.json

命令解析:

1.  创建索引模板的接口格式是ip:port/_index_template/{模板名称},模板名称可换成你们自定义的名称。

2.  -d @后面的是对应的模板文件名称,我这个命令是在模板所在的目录执行的,如果你是要在其他地方执行,请带上完整路径。

3. 模板里面的priority属性一定要注意:多个模板存在的情况,一定要修改值,不能存在相同priority的模板,这个属性其实就是order的作用,新版本es不允许模板的order存在相同配置。

或者可以直接用postman请求es创建索引模板接口也行,接口还是上面命令里面的接口,body直接复制上面模板内容即可。

创建完模板之后,验证一下我们刚刚创建的索引有没有成功,请求接口:http://ip:9200/_index_template/{模板名称} 查询索引模板的接口,然后查看有没有我们刚刚创建的索引模板,如下所示:

或者查看响应:{"acknowledged":true} 则表示成功

5.小总结

到这里,我们再来捋一下整体流程:

1.首先在开发之前先大家讨论制定好日志类型、日志格式

2.接下来就是定好日志框架,包括日志配置、日志的输入输出等

3.根据前两步对file beat进行配置,能够正确采集到我们应用产生的日志文件内的数据

4.file beat配置好日志特定的topic,然后采集的数据输出到kafka

5.kafka进行日志专属topic的创建

6.logstash通过读取kafka-日志topic中的日志数据对日志进行分析、整理,最后生成es索引所需要的特定格式数据,并输出到es中

7.es制定好日志对应所需要的索引模板,然后创建模板,等待数据从logstash输入。

捋完流程我们再来回过头来将流程跑一遍看看效果如何,日志是否能按照我们所设计的完美进入es中,请按照以下流程来启动各组件(ps:因为各个组件有严格的上下游关系,所以我们一般都是从最底层开始);

1.启动我们的应用,确保有日志输出(随便访问下接口让日志产生)

2.check一下es有没有添加我们的索引模板,check方式请看上面,索引模板ok的话,我们启动es,执行如下命令:

  1. # 进入{es安装目录/bin目录}
  2. nohup sh elasticsearch -d >> ../logs/out.log 2>&1 &

3.启动zookeeper,执行如下命令:

  1. # j进入{安装目录}/bin目录 启动
  2. sh zkServer.sh start
  3. # 查看状态
  4. sh zkServer.sh status
  5. #停止
  6. sh zkServer.sh stop

4.启动kafka,执行如下命令:

  1. #启动kafka
  2. nohup sh kafka-server-start.sh -daemon ../config/server.properties >>/dev/null 2>&1 &
  3. ## 初始化topic
  4. ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181/kafka --replication-factor 1 --partitions 1 --topic app-log

5.启动logstash,执行如下命令

  1. #进入{安装目录}/bin目录
  2. nohup sh logstash -f ../config/test-logstash.conf >> ../logs/out.log 2>&1 &

6.启动file beat,执行如下命令

  1. #进入{安装目录}
  2. nohup ./filebeat -e -c test-filebeat.yml >/dev/null 2>&1 &

7.验证日志是否正确输出到es

先查看是否有正确按照我们的配置生成索引,请浏览器直接访问接口:

http://ip:9200/_cat/indices?v ,不出意外的话我们就能看到返回的数据有我们定义的索引数据存在。

然后我们查看一下具体某个索引里面的数据,看下结构数据跟我们设定的模板对不对的上,

用远程工具,比如postman访问接口:

post请求:http://ip:9200/{索引名称}/_search?ignore_unavailable=true ,正常情况下会返回如下格式的数据:

  1. {
  2. "took": 1,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 5760,
  13. "relation": "eq"
  14. },
  15. "max_score": 1.0,
  16. "hits": [
  17. {
  18. "_index": "app-dap-trace-log-2024.03.02",
  19. "_id": "J1DEQI4BgHvBZxO0CTeC",
  20. "_score": 1.0,
  21. "_ignored": [
  22. "event.original.keyword"
  23. ],
  24. "_source": {
  25. "input": {
  26. "type": "filestream"
  27. },
  28. "hra": "127.0.0.1",
  29. "host": {
  30. "name": "node-127-0-0-1"
  31. },
  32. "hurl": "http://127.0.0.1:14083/api/demo/aa",
  33. "@version": "1",
  34. "sw_tid": "",
  35. "comp": "SPRING_WEB",
  36. "inst": "127.0.0.1:test-spring-log:14083",
  37. "app": "test-spring-log",
  38. "ecs": {
  39. "version": "8.0.0"
  40. },
  41. "r_event": "SR",
  42. "@timestamp": "2024-03-02T17:57:05.056Z",
  43. "hm": "POST",
  44. "log": {
  45. "offset": 415082,
  46. "file": {
  47. "path": "/opt/elk/test-logs/app-dap-trace.log.2024-03-03.0"
  48. }
  49. },
  50. "agent": {
  51. "name": "node-127-0-0-1",
  52. "version": "8.1.2",
  53. "ephemeral_id": "5ff7d980-129a-4af5-bade-85c656a63fec",
  54. "id": "d72a0d75-7491-4ebf-9bdd-f461f84dd7bc",
  55. "type": "filebeat"
  56. },
  57. "event": {
  58. "original": "{\"@timestamp\":\"2024-03-15T06:18:58.222Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"8.1.2\"}}"
  59. },
  60. "time": "2024-03-03 01:57:05.056",
  61. "sid": "2bdf6a3a87116341",
  62. "tid": "2bdf6a3a87116341",
  63. "fields": {
  64. "app-topic-name": "app-log",
  65. "app-log-type": "app-dap-trace-log"
  66. }
  67. }
  68. }
  69. ]
  70. }
  71. }

其中,hits.hits._source中的数据就是所存进去的该类型的日志数据以及结构,我们可以看到_source下面的一些key都是我们在模板里面定好的,没有定的就是logstash本身的属性字段,这就是一个我们所需要的一个完整的es索引结构,将来做条件查询的时候这些字段会有大用处。

如果你按照本文来一步一步走下来,得到的数据结构和上面的示例一样,就说明你已经成功了。

6.日志查看

这一块其实有点不大好讲,因为涉及到实际业务场景,每个人所运用的场景都不一样,我就按照我现在做的来给大家讲一讲思路,归根结底,日志中心最后展现给人的就是这个环节:日志查看。用户也是最关心这一点,其他的包括采集、存储什么什么的用户是不关心的,他们只需要一个符合他们的结果就行,其他你们程序员搞定就行(~!~),所以日志查看最核心的就是节合业务场景,那本文所采用的方案就是将业务字段存储进es索引中,节合日志查看系统进行业务场景条件查询日志,这样的话日志和业务就能紧密节合在一起了,如何将业务字段存储进es中,前面已经很详细的描述了,接下来就是如何查询,这里我只是讲一下大致的实现思路细节,给大家提供一个可参考的方向,具体如下:

Rest apis

es已经提供了一套rest api,无论大家是使用java es client还是其他方式,底层都是rest api,所以直接使用rest api,再节合业务场景进行查询es即可,我就列举一些常见的查询示例

根据索引名称条件查询日志信息

POST:http://10.16.16.103:9200/{index_name}/_search 

body:

  1. {
  2. "from" : 0, "size" : 300,
  3. "query":{"bool":{"filter":
  4. [
  5. {"range":{"time":{"gte":"2024-04-02 10:31:23.000", "lt":"2024-04-03 10:31:23.000"}}}
  6. , {"term": {"app": "appName"}}
  7. , {"term": {"inst": "应用实例id"}} //todo 也可以加其他业务字段 只要是你日志里面对应有的
  8. ]
  9. }},
  10. "sort": { "time": { "order": "asc" }}
  11. }

这样我们就可以在body里面自定义添加我们业务上需要的查询条件进行search,日志系统也可以自行节合业务场景然后封装自己的rest api进行查询即可,虽然现在es也自己提供了java client,但是我感觉这个适合那种场景:应用需要自行操作es,比如将es当作应用本身的存储来交互;这种的就比较适合使用client来进行操作,但是日志系统并不需要其他的,只需要查询而已,所以为了不强依赖es,我们可以节合业务场景自行封装rest api。

自此,一个完整的springboot整合ELK的日志中心DEMO方案就算是初步完成了,如有更好的方案,欢迎交流,点个赞吧~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/384588
推荐阅读
相关标签
  

闽ICP备14008679号