当前位置:   article > 正文

全链路日志追踪traceId(http、dubbo、mq)_链路追踪traceid

链路追踪traceid

一、概述

1、目的

目前许多系统项目之间的调用,有基于微服务的,有通过HTTP请求的,还有通过mq的。那么在处理一次请求的时候,可能会调用多个服务或者调用多个其他系统功能,这样就会产生很多的日志,此时,如果想查看这一次调用的完整的请求链路的日志时,就会变得比较困难,虽然我们有一些集中的日志收集工具比如ELK,我们需要把这一些日志串联起来,这个问题很关键,因为如果没有串联起来,排查日志就是一件很困难的事情。

 

2、解决办法

一般来说,系统之间的调用不外乎两种:http和dubbo。当然还有通过中间件mq的形式(SpringCloud也是基于MVC容器调用),系统内部有task定时任务等。

对于http请求:我们的做法是在最开始请求系统时候生成一个全局唯一的TraceID,放在http 请求header中,系统接收到请求后,从header中取出这个TraceID,放入MDC中,这个TraceID伴随着这整个请求的调用周期,即当一个服务调用另外一个服务的时候,需要往下传递,从而形成一条链路,这样当我们查看日志时,只需要根据关键搜索这个TraceID,整条调用链路的日志都可以查出来。

对于dubbo请求:我们可以在最开始请求的时候生成一个全局唯一性TraceID,放入RpcContext中,但是有这么一个问题,RpcContext只能做到消费者和提供者共享同一个RpcContext,假设我有一种调用关系,serverA -> serverB -> serverC,A->B的时候可以获取相同内容的RpcContext,但是B->C时候,A和C就无法共享相同内容的RpcContext了,对于这种情况我们利用ThreadLocal进行解决,我们先从RpcContext取出TraceID,放入ThreadLocal中,当调用别的服务再放入RpcContext中即可。当然利用MDC可以实现。

对于系统内部的task任务:同样在每个task开始执行之前系统内部生成一个全局唯一TraceID,然后把TraceID放入MDC中。

对于mq:与上面相同,在每个MQ listener处理数据开始之前调用一个共公方法生成一个全局唯一TraceID,然后把TraceID放入MDC中。

经过这些处理之后,我们在打印日志的时候只需要从MDC取出TraceID打印出来即可。

 

3、适用范围

语言环境:Java

项目架构:SSM项目。SpringBoot项目,dubbo服务,SpringCloud微服务

日志框架:log4j,logback

 

二、代码示例

1、Dubbo请求:

自定义一个Filter,实现com.alibaba.dubbo.rpc.Filter即可
Comsumer:

  1. @Slf4j
  2. @Activate(group = Constants.CONSUMER)
  3. public class ComsumerFilter implements Filter {
  4. public final static String TRACE_ID = "TraceID";
  5. public final static String MDC_UUID = "mdc_uuid";
  6. @Override
  7. public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  8. String interfaceName = invoker.getInterface().getName();
  9. String methodName = invocation.getMethodName();
  10. String REQ_SERVICE = interfaceName + "." + methodName;
  11. String SOURCE_IP = RpcContext.getContext().getRemoteAddressString();
  12. String traceID = RpcContext.getContext().getAttachment(TRACE_ID);
  13. if (StringUtils.isBlank(traceID)) {
  14. traceID = UUID.randomUUID().toString().replace("-", "");
  15. }
  16. RpcContext.getContext().setAttachment(TRACE_ID, traceID);
  17. MDC.put(MDC_UUID, traceID);
  18. long startTime = System.currentTimeMillis();
  19. Result result = null;
  20. try {
  21. result = invoker.invoke(invocation);
  22. if (result.hasException()) {
  23. log.error("Consumer.调用dubbo服务发生异常:called by [{}] service [{}] method [{}] fail",
  24. SOURCE_IP, interfaceName, methodName);
  25. log.error("Consumer.TraceFilter occurs exception", result.getException());
  26. }
  27. } catch (Exception e) {
  28. log.error("Consumer.dubbo异常:traceID:[{}] called by [{}] service [{}] method [{}] exception [{}] ",
  29. traceID, SOURCE_IP, invoker.getInterface().getName(), invocation.getMethodName(),
  30. e.getClass().getName() + e.getMessage());
  31. } finally {
  32. log.info("Consumer.dubbo返回:traceID:[{}] 消费方 [{}] 接口 [{}] 返回值 [{}] 耗时 {} 毫秒",
  33. traceID, SOURCE_IP, REQ_SERVICE, JSON.toJSONString(result), System.currentTimeMillis() - startTime);
  34. }
  35. return result;
  36. }
  37. }

Provider:

  1. @Slf4j
  2. @Activate(group = Constants.PROVIDER)
  3. public class ProviderFilter implements Filter {
  4. public final static String TRACE_ID = "TraceID";
  5. public final static String MDC_UUID = "mdc_uuid";
  6. @Override
  7. public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
  8. String traceID = MDC.get(MDC_UUID);
  9. String interfaceName = invoker.getInterface().getSimpleName();
  10. String methodName = invocation.getMethodName();
  11. String REQ_SERVICE = interfaceName + "." + methodName;
  12. String SOURCE_IP = RpcContext.getContext().getRemoteAddressString();
  13. if (StringUtils.isBlank(traceID)) {
  14. traceID = RpcContext.getContext().getAttachment(TRACE_ID);
  15. }
  16. if (StringUtils.isBlank(traceID)) {
  17. traceID = UUID.randomUUID().toString().replace("-", "");
  18. }
  19. RpcContext.getContext().setAttachment(TRACE_ID, traceID);
  20. Object[] args = invocation.getArguments();
  21. long startTime = System.currentTimeMillis();
  22. String logReqParam = JSON.toJSONString(args);
  23. MDC.put(MDC_UUID, TRACE_ID);
  24. log.info("Provider.dubbo请求:traceID:[{}] 消费方 [{}] 接口 [{}] 报文 [{}] ",
  25. traceID, SOURCE_IP, REQ_SERVICE, logReqParam);
  26. Result result = null;
  27. try {
  28. result = invoker.invoke(invocation);
  29. if (result.hasException()) {
  30. log.error("Provider.调用dubbo服务发生异常:called by [{}] service [{}] method [{}] fail",
  31. SOURCE_IP, interfaceName, methodName);
  32. log.error("TraceFilter occurs exception", result.getException());
  33. }
  34. } catch (Exception e) {
  35. log.error("Provider.dubbo异常:traceID:[{}] called by [{}] service [{}] method [{}] exception [{}] ",
  36. traceID, SOURCE_IP, invoker.getInterface().getName(), invocation.getMethodName(),
  37. e.getClass().getName() + e.getMessage(), e);
  38. throw e;
  39. } finally {
  40. log.info("Provider.dubbo返回:traceID:[{}] 消费方 [{}] 接口 [{}] 返回值 [{}] 耗时 {} 毫秒", traceID,
  41. SOURCE_IP, REQ_SERVICE, JSON.toJSONString(result), System.currentTimeMillis() - startTime);
  42. MDC.clear();
  43. }
  44. return result;
  45. }
  46. }

 

dubbo还需要配置com.alibaba.dubbo.rpc.Filter

在META-INF/dubbo目录下, 添加com.alibaba.dubbo.rpc.Filter文件, 其内容为dubboConsumerFilter=com.xxx.ConsumerFilter

名称自定义,=号后面为自定义的dubbofiliter全路径名称

同时需要配置yml文件spring.dubbo.consumer.filter 为dubboConsumerFilter

provider的配置为spring.dubbo.provider.filter 为dubboProviderFilter

 

2、HTTP请求:

写一个WebContextConfiguration类继承org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport,写一个TraceInterceptor类实现org.springframework.web.servlet.HandlerInterceptor

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
  4. import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
  5. @Configuration
  6. public class WebContextConfiguration extends WebMvcConfigurationSupport {
  7. @Override
  8. public void addInterceptors(InterceptorRegistry registry) {
  9. registry.addInterceptor(buildTraceInter()).addPathPatterns("/**").order(0);
  10. }
  11. @Bean
  12. public TraceInterceptor buildTraceInter() {
  13. return new TraceInterceptor();
  14. }
  15. }
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.slf4j.MDC;
  4. import org.springframework.web.servlet.HandlerInterceptor;
  5. import org.springframework.web.servlet.ModelAndView;
  6. import javax.servlet.http.HttpServletRequest;
  7. import javax.servlet.http.HttpServletResponse;
  8. @Slf4j
  9. public class TraceInterceptor implements HandlerInterceptor {
  10. public final static String TRACE_ID = "TraceID";
  11. public static final String START_TIMESTAMP = "START_TIMESTAMP";
  12. public static final String TRACE_URI = "trace_uri";
  13. public static final String UNKNOWN = "UNKNOWN";
  14. public final static String MDC_UUID = "mdc_uuid";
  15. @Override
  16. public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
  17. ThreadCache.setVal(START_TIMESTAMP, System.currentTimeMillis());
  18. MDC.put(TRACE_URI, StringUtils.defaultIfBlank(request.getRequestURI(), UNKNOWN));
  19. MDC.put(MDC_UUID, StringUtils.defaultIfBlank(request.getHeader(TRACE_ID), UUIDUtil.generateWithoutSeparator()));
  20. return true;
  21. }
  22. @Override
  23. public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) {
  24. }
  25. @Override
  26. public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) {
  27. log.info("processing completed cost time [{}ms]", System.currentTimeMillis() - (long) ThreadCache.getVal(START_TIMESTAMP));
  28. MDC.clear();
  29. }
  30. }

工具类:

  1. import java.util.HashMap;
  2. import java.util.Map;
  3. public class ThreadCache {
  4. private static final ThreadLocal<Map<String, Object>> LOCAL_CACHE = new ThreadLocal<Map<String, Object>>() {
  5. @Override
  6. protected Map<String, Object> initialValue() {
  7. return new HashMap<>(DEFAULT_SIZE);
  8. }
  9. };
  10. private static final int DEFAULT_SIZE = 8;
  11. public static <T> T getVal(String key) {
  12. return (T) LOCAL_CACHE.get().get(key);
  13. }
  14. public static <T> void setVal(String key, T val) {
  15. LOCAL_CACHE.get().put(key, val);
  16. }
  17. public static void clear(String key) {
  18. LOCAL_CACHE.get().remove(key);
  19. }
  20. }
  1. import java.util.UUID;
  2. public class UUIDUtil {
  3. public static String generateWithoutSeparator() {
  4. char[] array = new char[32]; int idx = 0;
  5. for (char c: generate().toCharArray()) {
  6. if (c != 45) {
  7. array[idx++] = c;
  8. continue;
  9. }
  10. }
  11. return new String(array);
  12. }
  13. public static String generate() {
  14. return UUID.randomUUID().toString();
  15. }
  16. }

对于http调用其他系统接口,需要在http工具类里面统一设置

request.setHeader(Trace.TRACE_ID, MDC.get(Trace.TRACE_ID));

在logback.xml中 只需在pattern中加上往MDC设置的traceID的key即可。

 

3、mq调用

以rocketMQ为例:

消费者

  1. import com.aliyun.openservices.ons.api.Action;
  2. import com.aliyun.openservices.ons.api.ConsumeContext;
  3. import com.aliyun.openservices.ons.api.Message;
  4. import com.aliyun.openservices.ons.api.MessageListener;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.slf4j.MDC;
  7. public abstract class AbstractListener implements MessageListener {
  8. public abstract Action doConsume(Message message, ConsumeContext consumeContext);
  9. @Override
  10. public Action consume(Message message, ConsumeContext consumeContext) {
  11. try {
  12. String traceId = message.getUserProperties(Trace.TRACE_ID);
  13. MDC.put(Trace.TRACE_ID, StringUtils.isNotBlank(traceId) ? traceId :
  14. StringUtils.defaultIfBlank(MDC.get(Trace.TRACE_ID), UUIDUtil.generateWithoutSeparator()));
  15. return this.doConsume(message, consumeContext);
  16. } finally {
  17. MDC.clear();
  18. }
  19. }
  20. }

消费者如何使用

  1. public class RefundMessageListener extends AbstractListener {
  2. @Override
  3. public Action doConsume(Message message, ConsumeContext consumeContext) {
  4. return null;
  5. }
  6. }

 

生产者:

  1. import com.aliyun.openservices.ons.api.Message;
  2. import com.aliyun.openservices.ons.api.SendCallback;
  3. import com.aliyun.openservices.ons.api.SendResult;
  4. import com.aliyun.openservices.ons.api.bean.ProducerBean;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.slf4j.MDC;
  7. public class ProducerTrace extends ProducerBean {
  8. @Override
  9. public SendResult send(Message message) {
  10. try {
  11. String traceId = message.getUserProperties(Trace.TRACE_ID);
  12. MDC.put(Trace.TRACE_ID, StringUtils.isNotBlank(traceId) ? traceId :
  13. StringUtils.defaultIfBlank(MDC.get(Trace.TRACE_ID), UUIDUtil.generateWithoutSeparator()));
  14. return super.send(message);
  15. } finally {
  16. MDC.clear();
  17. }
  18. }
  19. }

生产者如何使用:

1、将ProducerTrace注入

  1. @Configuration
  2. public class ProducerClient {
  3. @Autowired
  4. private RocketMQConfig mqConfig;
  5. @Autowired
  6. private LocalTransactionChecker localTransactionChecker;
  7. @Bean(initMethod = "start", destroyMethod = "shutdown")
  8. public ProducerTrace buildProducer() {
  9. ProducerTrace producer = new ProducerTrace();
  10. producer.setProperties(mqConfig.getMqProperties());
  11. return producer;
  12. }
  13. }

2、使用

  1. @Resource
  2. private ProducerTrace producerTrace;
  3. public void send(String topic, String tag, String body, Supplier<Boolean> supplier) {
  4. Message message = new Message(topic, tag, body.getBytes(StandardCharsets.UTF_8));
  5. SendResult sendResult = producerTrace.send(message)
  6. }

 

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号