大家好,欢迎来到IT知识分享网。
微服务框架TraceId方案
一、背景与目的
在项目中随着项目的业务越来越复杂,以及项目的微服务化等,导致平常的项目中出现如:同一次操作日志散乱等现象,对通过查询日志的方式排查问题造成极大困扰。因此迫切需要一种可以追溯当前链路操作日志的手段。
通过实现traceId功能,获取当前操作链路的以下日志信息:
- 当前操作http请求web端有traceId的日志信息
- 当前操作java服务端有traceId的dubbo consumer日志信息
- 当前操作java服务端有traceId的dubbo provider日志信息
- 其他可以串起来的服务日志信息,如MQ、异步线程等…
二、MDC机制
MDC(Mapped Diagnostic Context,映射诊断上下文)是 log4j 和 logback 提供的一种方便在多线程条件下记录日志的机制。MDC 可以看成是一个与当前线程绑定的Map,类似于 ThreadLocal 可以往其中添加键值对。MDC 中包含的内容可以被同一线程中执行的代码所访问。当前线程的子线程会继承其父线程中的 MDC 的内容(只有子线程创建的时候初始化一次并不适用于线程池)。当需要记录日志时,只需要从 MDC 中获取就行。
三、dubbo框架下TraceId方案
先定义一个通用的 TraceId 生成工具类,如下:
import org.springframework.util.StringUtils; public class TraceIdUtils {
public static final String X_TRACE_ID = "X-TraceId"; / * 通过 ThreadLocal 存储 traceId,保证同一个线程,可以获取到同一个 traceId */ private static final ThreadLocal<String> TRACE_ID_LOCAL = new ThreadLocal(); private TraceIdUtils() {
} // 生成一个traceId public static String genTraceId() {
String traceId = genTraceIdNotCached(); TRACE_ID_LOCAL.set(traceId); return traceId; } public static String genTraceIdNotCached() {
// TODO 注意,一般通过雪花算法生成 TraceId,这里方便简单,直接使用时间戳(实际生产需要定义雪花算法进行生成) return String.valueOf(System.currentTimeMillis()); } // 获取traceId public static String getTraceId() {
return (String)TRACE_ID_LOCAL.get(); } // 获取traceId public static String gen2GetTraceId() {
if (TRACE_ID_LOCAL.get() == null) {
return genTraceId(); } return (String)TRACE_ID_LOCAL.get(); } public static void clear() {
TRACE_ID_LOCAL.remove(); } }
3.1 一次用户请求链路跟踪
3.1.1 web层
定义Interceptor,并添加
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.util.StringUtils; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; public class WebTraceIdInterceptor implements HandlerInterceptor {
private static final Logger log = LoggerFactory.getLogger(WebTraceIdInterceptor.class); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) {
// 生成一个traceId traceId = TraceIdUtils.genTraceIdWithSW(); } TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
TraceIdUtils.clear(); MDC.clear(); } } @Configuration public class WebConfig implements WebMvcConfigurer {
@Bean public WebTraceIdInterceptor webTraceIdInterceptor() {
return new WebTraceIdInterceptor(); } @Override public void addInterceptors(InterceptorRegistry registry) {
// 添加 web traceId 拦截器 registry.addInterceptor(webTraceIdInterceptor()) .addPathPatterns("/") // 拦截所有请求 .order(Integer.MIN_VALUE); // order 越小,优先级越高 } }
3.1.2 RPC调用层
通过 Filter 、dubbo attachments参数进行处理。
- 定义 BaseFilter :用于处理通用逻辑。
- ConsumerTraceLogFilter:消费者Filter
- ProviderTraceLogFilter:服务提供者Filter
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONValidator; import org.apache.dubbo.common.utils.StringUtils; import org.apache.dubbo.rpc.Result; import org.springframework.util.ClassUtils; import java.io.File; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; public class BaseFilter {
private static final Integer SUB_LENGTH = 2000; public List<Object> obtainMethodArguments(Object[] args) {
if (args == null) {
return Collections.emptyList(); } List<Object> params = Arrays.asList(args); return params.stream().filter(arg -> !isExclude(arg)).collect(Collectors.toList()); } public boolean isExclude(Object arg) {
if (arg == null) {
return true; } else if (ClassUtils.isAssignable(File.class, arg.getClass())) {
return true; } else if (ClassUtils.isAssignable(InputStream.class, arg.getClass())) {
return true; } else {
return ClassUtils.isAssignable(OutputStream.class, arg.getClass()); } } public String subLength(Result result) {
final String resultVal = JSON.toJSONString(result.getValue()); final JSONValidator validator = JSONValidator.from(resultVal); String str = resultVal; if (validator.validate()) {
final JSONValidator.Type type = validator.getType(); if (JSONValidator.Type.Array.equals(type) && resultVal.length() > SUB_LENGTH) {
final JSONArray objects = JSON.parseArray(str); str = this.subLength(resultVal); str = str + "json array size=" + objects.size(); } } return this.subLength(str); } public String subLength(String str) {
if (StringUtils.isBlank(str)) {
return str; } else {
return str.length() < SUB_LENGTH ? str : str.substring(0, SUB_LENGTH).concat("..."); } } } import com.alibaba.fastjson.JSON; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @Activate(group = {
Constants.CONSUMER}, order = -100) public class ConsumerTraceLogFilter extends BaseFilter implements Filter {
private static final Logger log = LoggerFactory.getLogger(ConsumerTraceLogFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) {
// 生成一个traceId traceId = TraceIdUtils.genTraceIdWithSW(); } try {
if (StringUtils.isEmpty(traceId)) {
traceId = String.valueOf(System.currentTimeMillis()); } invocation.getAttachments().put(TraceIdUtils.X_TRACE_ID, traceId); long begin = System.currentTimeMillis(); // 执行方法 final Result result = invoker.invoke(invocation); String params = CollectionUtils.isEmpty(this.obtainMethodArguments(invocation.getArguments())) ? null : this.subLength(JSON.toJSONString(invocation.getArguments())); log.info("consumer rpc {}#{} cost={}ms params = {} result={}", invoker.getInterface().getName(), invocation.getMethodName(), System.currentTimeMillis() - begin, params, result.getValue() == null ? null : this.subLength(result)); return result; } finally {
TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); } } } import com.alibaba.fastjson.JSON; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.extension.Activate; import org.apache.dubbo.rpc.Filter; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Invoker; import org.apache.dubbo.rpc.Result; import org.apache.dubbo.rpc.RpcException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @Activate(group = {
Constants.PROVIDER}, order = -100) public class ProviderTraceLogFilter extends BaseFilter implements Filter {
private static final Logger log = LoggerFactory.getLogger(ProviderTraceLogFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
String params = CollectionUtils.isEmpty(this.obtainMethodArguments(invocation.getArguments())) ? null : this.subLength(JSON.toJSONString(invocation.getArguments())); String interfaceName = invoker.getInterface().getName(); String methodName = invocation.getMethodName(); String traceId = invocation.getAttachments().get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) {
traceId = TraceIdUtils.genTraceIdWithSW(); log.warn("rpc生产者 traceId为空主动生成 interfaceName={}, methodName={}", interfaceName, methodName); } TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); long begin = System.currentTimeMillis(); Result result = invoker.invoke(invocation); log.info("provider rpc {}#{} cost={}ms params = {} result={}", interfaceName, methodName, System.currentTimeMillis() - begin, params, result.getValue() == null ? null : this.subLength(result)); return result; } finally {
TraceIdUtils.clear(); MDC.clear(); } } }
3.2 线程池
编写 TaskDecorator
import org.apache.commons.lang3.StringUtils; import org.slf4j.MDC; import org.springframework.core.task.TaskDecorator; import java.util.Map; public class TraceIdTaskDecorator implements TaskDecorator {
@Override public Runnable decorate(Runnable runnable) {
// 获取当前线程调用MDC上下文 Map<String, String> contextMap = MDC.getCopyOfContextMap(); return () -> {
try {
if (contextMap != null) {
// 设置当前线程上下文map MDC.setContextMap(contextMap); } String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isBlank(traceId)) {
traceId = TraceIdUtils.genTraceIdWithSW(); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); } // 放到threadLocal 里面, 异步线程RPC调用 ConsumerTraceLogFilter 是从ThreadLocal中获取 TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); runnable.run(); } finally {
TraceIdUtils.clear(); MDC.clear(); } }; } }
添加 TaskDecorator
方法1:使用 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setTaskDecorator(new TraceIdTaskDecorator());
方法2:重写 ThreadPoolTaskExecutor.execute(Runable) 方法
private ExecutorService pool = new ThreadPoolExecutor(1, 5, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new DefaultThreadFactory("fefault-thread-factory")) {
@Override public void execute(Runnable command) {
// 重写execute方法 Runnable decorated = new TraceIdTaskDecorator().decorate(command); super.execute(decorated); } };
3.3 其他
通过定义切面拦截器进行处理,可以用于job、mq consumer等其他场景。
1. 定义TraceId注解
@Target({
ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface TraceId {
}
2. 定义切面
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.MDC; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @Aspect @Component public class TraceIdAspect {
@Pointcut("@annotation(xxx.xxx.TraceId)") public void pointcut() {
} @Around("pointcut()") public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
try {
String traceId = MDC.get(TraceIdUtils.X_TRACE_ID); if (StringUtils.isEmpty(traceId)) {
// 生成一个traceId traceId = TraceIdUtils.genTraceIdWithSW(); } TraceIdUtils.setTraceId(traceId); MDC.put(TraceIdUtils.X_TRACE_ID, traceId); return proceedingJoinPoint.proceed(); } finally {
TraceIdUtils.clear(); MDC.clear(); } } }
3. 给需要添加log traceId的方法添加注解
@Component @Slf4j public class TestTraceId {
@TraceId public void test(Message msg) {
log.info("xxxxxxxxx"); } }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/115323.html