微服务框架TraceId方案

微服务框架TraceId方案本文探讨了在复杂微服务项目中 由于日志分散导致的问题 提出了引入 TraceId 进行链路跟踪的解决方案

大家好,欢迎来到IT知识分享网。

一、背景与目的

在项目中随着项目的业务越来越复杂,以及项目的微服务化等,导致平常的项目中出现如:同一次操作日志散乱等现象,对通过查询日志的方式排查问题造成极大困扰。因此迫切需要一种可以追溯当前链路操作日志的手段。

通过实现traceId功能,获取当前操作链路的以下日志信息:

  1. 当前操作http请求web端有traceId的日志信息
  2. 当前操作java服务端有traceId的dubbo consumer日志信息
  3. 当前操作java服务端有traceId的dubbo provider日志信息
  4. 其他可以串起来的服务日志信息,如MQ、异步线程等…

二、MDC机制

MDC(Mapped Diagnostic Context,映射诊断上下文)是 log4j 和 logback 提供的一种方便在多线程条件下记录日志的机制。MDC 可以看成是一个与当前线程绑定的Map,类似于 ThreadLocal 可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容(只有子线程创建的时候初始化一次并不适用于线程池)。当需要记录日志时,只需要从 MDC 中获取就行。

三、dubbo框架下TraceId方案

先定义一个通用的 TraceId 生成工具类,如下:

import org.springframework.util.StringUtils; public class TraceIdUtils { 
    public static final String X_TRACE_ID = "X-TraceId"; / * 通过 ThreadLocal 存储 traceId,保证同一个线程,可以获取到同一个 traceId */ private static final ThreadLocal<String> TRACE_ID_LOCAL = new ThreadLocal(); private TraceIdUtils() { 
    } // 生成一个traceId public static String genTraceId() { 
    String traceId = genTraceIdNotCached(); TRACE_ID_LOCAL.set(traceId); return traceId; } public static String genTraceIdNotCached() { 
    // TODO 注意,一般通过雪花算法生成 TraceId,这里方便简单,直接使用时间戳(实际生产需要定义雪花算法进行生成) return String.valueOf(System.currentTimeMillis()); } // 获取traceId public static String getTraceId() { 
    return (String)TRACE_ID_LOCAL.get(); } // 获取traceId public static String gen2GetTraceId() { 
    if (TRACE_ID_LOCAL.get() == null) { 
    return genTraceId(); } return (String)TRACE_ID_LOCAL.get(); } public static void clear() { 
    TRACE_ID_LOCAL.remove(); } } 

3.1 一次用户请求链路跟踪

3.1.1 web层

定义Interceptor,并添加

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.util.StringUtils; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; public class WebTraceIdInterceptor implements HandlerInterceptor { 
    private static final Logger log = LoggerFactory.getLogger(WebTraceIdInterceptor.class); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { 
    String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) { 
    // 生成一个traceId traceId = TraceIdUtils.genTraceIdWithSW(); } TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception { 
    TraceIdUtils.clear(); MDC.clear(); } } @Configuration public class WebConfig implements WebMvcConfigurer { 
    @Bean public WebTraceIdInterceptor webTraceIdInterceptor() { 
    return new WebTraceIdInterceptor(); } @Override public void addInterceptors(InterceptorRegistry registry) { 
    // 添加 web traceId 拦截器 registry.addInterceptor(webTraceIdInterceptor()) .addPathPatterns("/") // 拦截所有请求 .order(Integer.MIN_VALUE); // order 越小,优先级越高 } } 

3.1.2 RPC调用层

通过 Filter 、dubbo attachments参数进行处理。

  • 定义 BaseFilter :用于处理通用逻辑。
  • ConsumerTraceLogFilter:消费者Filter
  • ProviderTraceLogFilter:服务提供者Filter
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONValidator; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.rpc.Result; import org.springframework.util.ClassUtils; import java.io.File; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; public class BaseFilter { 
    private static final Integer SUB_LENGTH = 2000; public List<Object> obtainMethodArguments(Object[] args) { 
    if (args == null) { 
    return Collections.emptyList(); } List<Object> params = Arrays.asList(args); return params.stream().filter(arg -> !isExclude(arg)).collect(Collectors.toList()); } public boolean isExclude(Object arg) { 
    if (arg == null) { 
    return true; } else if (ClassUtils.isAssignable(File.class, arg.getClass())) { 
    return true; } else if (ClassUtils.isAssignable(InputStream.class, arg.getClass())) { 
    return true; } else { 
    return ClassUtils.isAssignable(OutputStream.class, arg.getClass()); } } public String subLength(Result result) { 
    final String resultVal = JSON.toJSONString(result.getValue()); final JSONValidator validator = JSONValidator.from(resultVal); String str = resultVal; if (validator.validate()) { 
    final JSONValidator.Type type = validator.getType(); if (JSONValidator.Type.Array.equals(type) && resultVal.length() > SUB_LENGTH) { 
    final JSONArray objects = JSON.parseArray(str); str = this.subLength(resultVal); str = str + "json array size=" + objects.size(); } } return this.subLength(str); } public String subLength(String str) { 
    if (StringUtils.isBlank(str)) { 
    return str; } else { 
    return str.length() < SUB_LENGTH ? str : str.substring(0, SUB_LENGTH).concat("..."); } } } import com.alibaba.fastjson.JSON; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @Activate(group = { 
   Constants.CONSUMER}, order = -100) public class ConsumerTraceLogFilter extends BaseFilter implements Filter { 
    private static final Logger log = LoggerFactory.getLogger(ConsumerTraceLogFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { 
    String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) { 
    // 生成一个traceId traceId = TraceIdUtils.genTraceIdWithSW(); } try { 
    if (StringUtils.isEmpty(traceId)) { 
    traceId = String.valueOf(System.currentTimeMillis()); } invocation.getAttachments().put(TraceIdUtils.X_TRACE_ID, traceId); long begin = System.currentTimeMillis(); // 执行方法 final Result result = invoker.invoke(invocation); String params = CollectionUtils.isEmpty(this.obtainMethodArguments(invocation.getArguments())) ? null : this.subLength(JSON.toJSONString(invocation.getArguments())); log.info("consumer rpc {}#{} cost={}ms params = {} result={}", invoker.getInterface().getName(), invocation.getMethodName(), System.currentTimeMillis() - begin, params, result.getValue() == null ? null : this.subLength(result)); return result; } finally { 
    TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); } } } import com.alibaba.fastjson.JSON; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @Activate(group = { 
   Constants.PROVIDER}, order = -100) public class ProviderTraceLogFilter extends BaseFilter implements Filter { 
    private static final Logger log = LoggerFactory.getLogger(ProviderTraceLogFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { 
    try { 
    String params = CollectionUtils.isEmpty(this.obtainMethodArguments(invocation.getArguments())) ? null : this.subLength(JSON.toJSONString(invocation.getArguments())); String interfaceName = invoker.getInterface().getName(); String methodName = invocation.getMethodName(); String traceId = invocation.getAttachments().get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) { 
    traceId = TraceIdUtils.genTraceIdWithSW(); log.warn("rpc生产者 traceId为空主动生成 interfaceName={}, methodName={}", interfaceName, methodName); } TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); long begin = System.currentTimeMillis(); Result result = invoker.invoke(invocation); log.info("provider rpc {}#{} cost={}ms params = {} result={}", interfaceName, methodName, System.currentTimeMillis() - begin, params, result.getValue() == null ? null : this.subLength(result)); return result; } finally { 
    TraceIdUtils.clear(); MDC.clear(); } } } 

3.2 线程池

编写 TaskDecorator

import org.apache.commons.lang3.StringUtils; import org.slf4j.MDC; import org.springframework.core.task.TaskDecorator; import java.util.Map; public class TraceIdTaskDecorator implements TaskDecorator { 
    @Override public Runnable decorate(Runnable runnable) { 
    // 获取当前线程调用MDC上下文 Map<String, String> contextMap = MDC.getCopyOfContextMap(); return () -> { 
    try { 
    if (contextMap != null) { 
    // 设置当前线程上下文map MDC.setContextMap(contextMap); } String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isBlank(traceId)) { 
    traceId = TraceIdUtils.genTraceIdWithSW(); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); } // 放到threadLocal 里面, 异步线程RPC调用 ConsumerTraceLogFilter 是从ThreadLocal中获取 TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); runnable.run(); } finally { 
    TraceIdUtils.clear(); MDC.clear(); } }; } } 

添加 TaskDecorator
方法1:使用 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setTaskDecorator(new TraceIdTaskDecorator()); 

方法2:重写 ThreadPoolTaskExecutor.execute(Runable) 方法

private ExecutorService pool = new ThreadPoolExecutor(1, 5, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new DefaultThreadFactory("fefault-thread-factory")) { 
    @Override public void execute(Runnable command) { 
    // 重写execute方法 Runnable decorated = new TraceIdTaskDecorator().decorate(command); super.execute(decorated); } }; 

3.3 其他

通过定义切面拦截器进行处理,可以用于job、mq consumer等其他场景。

1. 定义TraceId注解

@Target({ 
   ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface TraceId { 
    } 

2. 定义切面

import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.MDC; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Aspect @Component public class TraceIdAspect { 
    @Pointcut("@annotation(xxx.xxx.TraceId)") public void pointcut() { 
   } @Around("pointcut()") public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { 
    try { 
    String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) { 
    // 生成一个traceId traceId = TraceIdUtils.genTraceIdWithSW(); } TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); return proceedingJoinPoint.proceed(); } finally { 
    TraceIdUtils.clear(); MDC.clear(); } } } 

3. 给需要添加log traceId的方法添加注解

@Component @Slf4j public class TestTraceId { 
    @TraceId public void test(Message msg) { 
    log.info("xxxxxxxxx"); } } 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/115323.html

(0)
上一篇 2025-12-03 16:15
下一篇 2025-12-03 16:26

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信