简介
分布式系统中,由多个服务构成,每个请求路由过来后,会在多个服务中留下追踪ID,可以基于此追踪ID排查问题,分析请求的执行链路。
业界也有比较成熟的链路追踪ID方案,比如Skywalking,它基于动态字节码技术,本身会增加系统的复杂性,以及它需要单独部署服务对信息进行采集,这对于ToB资源敏感的场景下并不适用。traceID是基于自身必需框架开发的,主要技术是日志MDC、跨线程包装、中间件拦截器,所以可以几乎不增加任何额外资源开销。
本次traceId支持dubbo,kafka,http请求,线程池
接入步骤
1、日志中增加traceId变量
logback.xml日志配置,当然你也可以自定义格式,变量%X{traceId}是追踪ID替换值:
若框架使用log4j,在日志格式中增加%X{traceId}变量
<property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - [%X{traceId}] - %msg%n"/>
2、代码实现
2.0 依赖,不同实现按需添加依赖,例如kafka相关接入traceId,项目本身已经引入kafka
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>
2.1 http请求
编写拦截器
public class TraceWebInterceptor extends HandlerInterceptorAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(TraceWebInterceptor.class); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { request.setAttribute("startTime", System.currentTimeMillis()); //traceOrigin、traceCaller、traceId String traceOrigin = request.getHeader(TraceConstants.LOG_TRACE_ORIGIN); String traceCaller = request.getHeader(TraceConstants.LOG_TRACE_CALLER); String traceId = request.getHeader(TraceConstants.LOG_TRACE_ID); //如果不存在traceId需要生成 if (StringUtils.isBlank(traceId)) { boolean generate = TraceUtil.loadTraceInfo(); if(generate) { LOGGER.debug("[生成追踪信息]" + TraceUtil.getTraceInfoString()); } }else { //设置MDC MDC.put(TraceConstants.LOG_TRACE_ORIGIN, traceOrigin); MDC.put(TraceConstants.LOG_TRACE_CALLER, traceCaller); MDC.put(TraceConstants.LOG_TRACE_ID, traceId); } //IP String traceIp = IpUtil.getIp(request); MDC.put(TraceConstants.LOG_TRACE_IP, traceIp); //响应返回 response.setHeader(TraceConstants.LOG_TRACE_ID, TraceUtil.getTraceId()); return super.preHandle(request, response, handler); } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws IOException { if (LOGGER.isInfoEnabled()) { long upmsStartTime = (long) request.getAttribute("startTime"); long upmsEndTime = System.currentTimeMillis(); long upmsIntervalTime = upmsEndTime - upmsStartTime; LOGGER.info("{} {}接口耗时{}毫秒", request.getRequestURL(), request.getMethod(), upmsIntervalTime); } MDC.clear(); }
编写Config类, 将拦截器TraceWebInterceptor添加到容器
@Configuration @ConditionalOnClass({HandlerInterceptorAdapter.class, MDC.class, HttpServletRequest.class}) public class TraceWebAutoConfiguration implements WebMvcConfigurer { private static List<String> EXCLUDE_PATHS = new ArrayList<>(); @Value("${" + TraceConstants.CONFIG_TRACE_EXCLUDE_PATHS + ":}") private String excludePaths; @Bean public TraceWebInterceptor traceWebInterceptor() { return new TraceWebInterceptor(); } @Override public void addInterceptors(InterceptorRegistry registry) { EXCLUDE_PATHS.add("/error"); EXCLUDE_PATHS.add("/actuator/**"); if (StringUtils.isNotBlank(excludePaths)) { if (excludePaths.contains(",")) { String[] split = excludePaths.split(","); EXCLUDE_PATHS.addAll(Arrays.asList(split)); } else { EXCLUDE_PATHS.add(excludePaths); } } //该方式不能过全部过滤掉 registry.addInterceptor(traceWebInterceptor()).order(-100).excludePathPatterns(EXCLUDE_PATHS); } }
工具类
public class IpUtil { private static final String UNKNOWN = "unknown"; public static String getIp(HttpServletRequest request) { if (request == null) { return UNKNOWN; } String ip = request.getHeader("x-forwarded-for"); if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("X-Forwarded-For"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("WL-Proxy-Client-IP"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getHeader("X-Real-IP"); } if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) { ip = request.getRemoteAddr(); } return "0:0:0:0:0:0:0:1".equals(ip) ? "127.0.0.1" : ip; } } public class TraceConstants { public static final String LOG_TRACE_ORIGIN = "traceOrigin"; public static final String LOG_TRACE_CALLER = "traceCaller"; public static final String LOG_TRACE_IP = "traceIp"; public static final String LOG_TRACE_ID = "traceId"; public static final String CONFIG_TRACE_EXCLUDE_PATHS = "trace.exclude.paths"; public TraceConstants() { } } import java.util.UUID; import org.apache.commons.lang3.StringUtils; import org.slf4j.MDC; public class TraceUtil { private static boolean simbaHttpClientInterceptorFlag = true; private static boolean sdkInterceptorFlag = false; private static String applicationName; public TraceUtil() { } public static void setApplicationName(String applicationName) { TraceUtil.applicationName = applicationName; } public static String getApplicationName() { return applicationName; } public static boolean getSimbaHttpClientInterceptorFlag() { return simbaHttpClientInterceptorFlag; } public static void setSimbaHttpClientInterceptorFlag(boolean simbaHttpClientInterceptorFlag) { TraceUtil.simbaHttpClientInterceptorFlag = simbaHttpClientInterceptorFlag; } public static boolean getSdkInterceptorFlag() { return sdkInterceptorFlag; } public static void setSdkInterceptorFlag(boolean sdkInterceptorFlag) { TraceUtil.sdkInterceptorFlag = sdkInterceptorFlag; } public static void setTraceCaller(String traceCaller) { MDC.put("traceCaller", traceCaller); } public static String getTraceCaller() { return MDC.get("traceCaller"); } public static void setTraceOrigin(String traceOrigin) { MDC.put("traceOrigin", traceOrigin); } public static String getTraceOrigin() { return MDC.get("traceOrigin"); } public static void setTraceId(String traceId) { MDC.put("traceId", traceId); } public static void removeTraceId() { MDC.remove("traceId"); } public static void clearMdc() { MDC.clear(); } public static String getTraceId() { return MDC.get("traceId"); } public static String genTraceId() { return UUID.randomUUID().toString().replace("-", ""); } public static String getTraceIp() { return MDC.get("traceIp"); } public static void setTraceIp(String traceIp) { MDC.put("traceIp", traceIp); } public static boolean loadTraceInfo() { boolean generate = false; String traceId = getTraceId(); if (StringUtils.isBlank(traceId)) { traceId = genTraceId(); generate = true; } setTraceId(traceId); return generate; } public static String getTraceInfoString() { return "TraceId:" + getTraceId() + ". traceCaller:" + getTraceCaller() + ". traceOrigin:" + getTraceOrigin(); } } import org.slf4j.MDC; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class WrapUtil { public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) { return () -> { if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } TraceUtil.loadTraceInfo(); try { return callable.call(); } finally { MDC.clear(); } }; } public static <T> Callable<T> wrap(final Callable<T> callable) { return wrap(callable, MDC.getCopyOfContextMap()); } public static Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } TraceUtil.loadTraceInfo(); try { runnable.run(); } finally { MDC.clear(); } }; } public static Runnable wrap(final Runnable runnable) { return wrap(runnable, MDC.getCopyOfContextMap()); } public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public static ThreadPoolExecutor newThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { return new ThreadPoolExecutorMdcWrapper(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public static ForkJoinPool newForkJoinPool() { return new ForkJoinPoolMdcWrapper(); } public static class ThreadPoolExecutorMdcWrapper extends ThreadPoolExecutor { public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ThreadPoolExecutorMdcWrapper(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public void execute(Runnable task) { super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> Future<T> submit(Runnable task, T result) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), result); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } } public static class ForkJoinPoolMdcWrapper extends ForkJoinPool { public ForkJoinPoolMdcWrapper() { super(); } public ForkJoinPoolMdcWrapper(int parallelism) { super(parallelism); } public ForkJoinPoolMdcWrapper(int parallelism, ForkJoinWorkerThreadFactory factory, Thread.UncaughtExceptionHandler handler, boolean asyncMode) { super(parallelism, factory, handler, asyncMode); } @Override public void execute(Runnable task) { super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> ForkJoinTask<T> submit(Runnable task, T result) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), result); } @Override public <T> ForkJoinTask<T> submit(Callable<T> task) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } } }
2.2 Spring中的@Async注解中的线程池
@Configuration public class TaskExecutorConfig { @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = TraceSpringTraceWrapUtil.newThreadPoolTaskExecutor(); // 设置核心线程数 executor.setCorePoolSize(Runtime.getRuntime().availableProcessors()); // 设置最大线程数 executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 10); // 设置队列容量 executor.setQueueCapacity(Runtime.getRuntime().availableProcessors() * 10); // 设置线程活跃时间(秒) executor.setKeepAliveSeconds(10); // 设置默认线程名称 executor.setThreadNamePrefix("scheduled-"); // 设置拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 等待所有任务结束后再关闭线程池 executor.setWaitForTasksToCompleteOnShutdown(true); executor.initialize(); return executor; } }
相关工具类
import com.startdt.license.util.WrapUtil; import org.slf4j.MDC; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.concurrent.ListenableFuture; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.RunnableScheduledFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public class TraceSpringTraceWrapUtil extends WrapUtil { public static class ThreadPoolTaskExecutorMdcWrapper extends ThreadPoolTaskExecutor { @Override public void execute(Runnable task) { super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public void execute(Runnable task, long startTimeout) { super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public ListenableFuture<?> submitListenable(Runnable task) { return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } } public static class ScheduledThreadPoolTaskExecutorMdcWrapper extends ScheduledThreadPoolExecutor { public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize) { super(corePoolSize); } public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, threadFactory); } @Override protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) { return super.decorateTask(WrapUtil.wrap(callable, MDC.getCopyOfContextMap()), task); } @Override protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { return super.decorateTask(WrapUtil.wrap(runnable, MDC.getCopyOfContextMap()), task); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return super.schedule(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), delay, unit); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return super.scheduleAtFixedRate(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), initialDelay, period, unit); } } public static class ThreadPoolTaskSchedulerWrapper extends ThreadPoolTaskScheduler { @Override public ScheduledFuture<?> schedule(Runnable task, Date startTime) { return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime); } @Override public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), trigger); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) { return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), period); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) { return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime, period); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) { return super.scheduleWithFixedDelay(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), delay); } } public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor() { return new ThreadPoolTaskExecutorMdcWrapper(); } public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize) { return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize); } public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize, threadFactory); } public static ThreadPoolTaskScheduler newThreadPoolTaskScheduler() { return new ThreadPoolTaskSchedulerWrapper(); } }
2.3 spring线程池
import com.startdt.license.util.WrapUtil; import org.slf4j.MDC; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.concurrent.ListenableFuture; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.RunnableScheduledFuture; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public class TraceSpringTraceWrapUtil extends WrapUtil { public static class ThreadPoolTaskExecutorMdcWrapper extends ThreadPoolTaskExecutor { @Override public void execute(Runnable task) { super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public void execute(Runnable task, long startTimeout) { super.execute(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout); } @Override public <T> Future<T> submit(Callable<T> task) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public Future<?> submit(Runnable task) { return super.submit(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public ListenableFuture<?> submitListenable(Runnable task) { return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { return super.submitListenable(WrapUtil.wrap(task, MDC.getCopyOfContextMap())); } } public static class ScheduledThreadPoolTaskExecutorMdcWrapper extends ScheduledThreadPoolExecutor { public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize) { super(corePoolSize); } public ScheduledThreadPoolTaskExecutorMdcWrapper(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, threadFactory); } @Override protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) { return super.decorateTask(WrapUtil.wrap(callable, MDC.getCopyOfContextMap()), task); } @Override protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { return super.decorateTask(WrapUtil.wrap(runnable, MDC.getCopyOfContextMap()), task); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return super.schedule(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), delay, unit); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return super.scheduleAtFixedRate(WrapUtil.wrap(command, MDC.getCopyOfContextMap()), initialDelay, period, unit); } } public static class ThreadPoolTaskSchedulerWrapper extends ThreadPoolTaskScheduler { @Override public ScheduledFuture<?> schedule(Runnable task, Date startTime) { return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime); } @Override public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { return super.schedule(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), trigger); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) { return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), period); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) { return super.scheduleAtFixedRate(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), startTime, period); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay) { return super.scheduleWithFixedDelay(WrapUtil.wrap(task, MDC.getCopyOfContextMap()), delay); } } public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor() { return new ThreadPoolTaskExecutorMdcWrapper(); } public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize) { return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize); } public static ScheduledThreadPoolExecutor newScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolTaskExecutorMdcWrapper(corePoolSize, threadFactory); } public static ThreadPoolTaskScheduler newThreadPoolTaskScheduler() { return new ThreadPoolTaskSchedulerWrapper(); } }
2.4 dubbo rpc
@Activate(group = {DubboConstants.CONSUMER} , order = -9999) public class TraceDubboConsumerFilter implements Filter { private Logger logger = LoggerFactory.getLogger(TraceDubboConsumerFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String traceId = TraceUtil.getTraceId(); String traceCaller = TraceUtil.getTraceCaller(); String traceOrigin = TraceUtil.getTraceOrigin(); if(StringUtils.isBlank(traceId)) { traceId = TraceUtil.genTraceId(); traceCaller = TraceUtil.getApplicationName(); traceOrigin = TraceUtil.getApplicationName(); logger.debug("[仅生成追踪信息]traceId:{}. traceCaller:{}. traceOrigin:{}", traceId, traceCaller, traceOrigin); } RpcContext.getContext().setAttachment(TraceConstants.LOG_TRACE_ID, traceId); RpcContext.getContext().setAttachment(TraceConstants.LOG_TRACE_CALLER, traceCaller); RpcContext.getContext().setAttachment(TraceConstants.LOG_TRACE_ORIGIN, traceOrigin); return invoker.invoke(invocation); } }
@Activate(group = {DubboConstants.PROVIDER}, order = -10000) public class TraceDubboProviderFilter implements Filter { private Logger logger = LoggerFactory.getLogger(TraceDubboConsumerFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { //traceOrigin、traceCaller、traceId String traceId = invocation.getAttachment(TraceConstants.LOG_TRACE_ID); String traceCaller = invocation.getAttachment(TraceConstants.LOG_TRACE_CALLER); String traceOrigin = invocation.getAttachment(TraceConstants.LOG_TRACE_ORIGIN); //如果不存在traceId需要生成 if (StringUtils.isBlank(traceId)) { boolean generate = TraceUtil.loadTraceInfo(); if(generate) { logger.info("[生成追踪信息]" + TraceUtil.getTraceInfoString()); } }else { //设置MDC MDC.put(TraceConstants.LOG_TRACE_ORIGIN, traceOrigin); MDC.put(TraceConstants.LOG_TRACE_CALLER, traceCaller); MDC.put(TraceConstants.LOG_TRACE_ID, traceId); } //IP String clientIp = RpcContext.getContext().getRemoteHost(); MDC.put(TraceConstants.LOG_TRACE_IP, clientIp); try { return invoker.invoke(invocation); } finally { MDC.clear(); } } }
2.5 kafka
生产者:
a:增加配置项:
spring: kafka: producer: properties: interceptor: classes: com.trace.kafka.TraceProducerInterceptor
b.kafkaTemplateAutoConfig代码中增加
if (StringUtils.isNotBlank(producerInterceptors)) { props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, producerInterceptors); }
消费者前增加代码:
TraceConsumerUtil.trace(consumerRecord);
消费者工具类
import com.startdt.simba.boot.common.constants.TraceConstants; import com.startdt.simba.boot.common.utils.TraceUtil; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TraceConsumerUtil { private static final Logger LOGGER = LoggerFactory.getLogger(TraceConsumerUtil.class); public static void trace(ConsumerRecord consumerRecord) { try { if(consumerRecord == null) { return; } Iterable<Header> headers = consumerRecord.headers().headers(TraceConstants.LOG_TRACE_ID); Header firstHeader = null; for(Header header : headers) { if(firstHeader != null) { break; } if(header != null) { firstHeader = header; } } if(firstHeader == null) { TraceUtil.setTraceId(TraceUtil.genTraceId()); }else { TraceUtil.setTraceId(new String(firstHeader.value())); } }catch (Exception ex) { LOGGER.warn("Kafka tracking ID setting failed, reason:" + ex.getMessage(), ex); } } }
生产者拦截器
public class TraceProducerInterceptor implements ProducerInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(TraceProducerInterceptor.class); @Override public ProducerRecord onSend(ProducerRecord record) { try { TraceUtil.loadTraceInfo(); String traceId = TraceUtil.getTraceId(); if(StrUtil.isNotBlank(traceId)) { record.headers().add(TraceConstants.LOG_TRACE_ID, traceId.getBytes()); } }catch (Exception ex) { LOGGER.warn("Kafka tracking ID setting failed, reason:" + ex.getMessage(), ex); } return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
2.6 线程池使用步骤
替换ThreadPoolExecutor为包装类
private static ThreadPoolExecutor executor = new TraceWrapUtil.ThreadPoolExecutorMdcWrapper(defaultPoolSize, defaultPoolSize, 10L,TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new NameThreadFactory("warn.message.handler."), new AbortPolicy());