jjzjj

链路日志中追踪traceId

MikeTeas 2023-08-23 原文

一,使用traceId概述

平时出现问题查询日志是程序员的解决方式,日志一般会从服务器的日志文件,然后查找自己需要的日志,或者日志输出到es中,在es中进行搜索日志,可是对于目前流行的微服务或者单体服务,将日志串起来去查看并不是一件容易的事情,一般微服务会调用多个系统,有http请求的,有mq的等会产生大量的日志,根据日志快速定位到具体的问题才是我们想要的解决方案,毕竟要用最短的时间找到问题所在,并快速解决。目前的elk搜集日志,也只是把所有的日志搜集起来,并没有将具体的日志按照请求串起来,所以这个目前需要在日志中添加traceId进行日志的追踪。

二,请求的源头

1,http请求
思路
在最开始请求系统时候生成一个全局唯一的traceId,放在http 请求header中,系统接收到请求后,从header中取出这个traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期,即当一个服务调用另外一个服务的时候,需要将traceId往下传递,从而形成一条链路。
实现
@Service
public class TraceInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String traceId = request.getHeader(ElkConstants.TRACE_ID);
if (StringUtils.isNotEmpty(traceId)) {
MDC.put(ElkConstants.TRACE_ID, traceId);
} else {
MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
}
return true;
}

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        MDC.remove(ElkConstants.TRACE_ID);
    }
}

@Configuration
public class InterceptorConfig implements WebMvcConfigurer {

    @Resource
    private TraceInterceptor traceInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(traceInterceptor).addPathPatterns("/**");
    }
}

public class ElkConstants {

/**
 * TRACE_ID
 */
public static final String TRACE_ID = "traceId";

}
2,定时任务Task
思路
在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现
@Component
@Slf4j
public class SyncTask extends BaseTask {

/**
 * 定时任务
 */
public void sync() {
    //设置traceId
    setTraceId();
    //自己的业务逻辑
}

}
public abstract class BaseTask {

public void setTraceId() {
    MDC.put(ElkConstants.TRACE_ID, UUID.randomUUID().toString());
}

}
3,微服务(Feign)
思路
在请求的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现
@Configuration
public class FeignInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
requestTemplate.header(ElkConstants.TRACE_ID, (String) MDC.get(ElkConstants.TRACE_ID));
}
}
4,Mq的方式
思路
使用mq进行消息发送的时候,可以将请求发送消息的traceId从mdc中取出来,我们可以放到消息体中,当做一个字段,然后在消息端消费的时候,从消息体中获取到traceId,并放入到MDC中,伴随着此次的请求
实现(kafka为列子,其他也可以按照思路进行不同的实现)
@Component
public class KafkaSender implements MessageSender {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

@Autowired
private KafkaConfig kafkaConfig;

private static final Logger log = LoggerFactory.getLogger(KafkaSender.class);

/**
 * 异步发送
 *
 * @param mqSendEvent
 * @return
 */
@Override
public boolean sendAsyncMessage(MqSendEvent mqSendEvent) {
   try {
        mqSendEvent.setTraceId(MDC.get("traceId"));
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(mqSendEvent.getTopic(),
                JSON.toJSONString(mqSendEvent));
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("kafka sendAsyncMessage error, ex = {}, topic = {}, data = {}", ex, mqSendEvent.getTopic(),
                        JSON.toJSONString(mqSendEvent));
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("kafka sendAsyncMessage success topic = {}, data = {}", mqSendEvent.getTopic(),
                        JSON.toJSONString(mqSendEvent));
            }
        });
    } finally {
        MDC.clear();
    }
    return true;
}

}

public interface MessageSender {

/**
 * 异步发送
 *
 * @param mqSendEvent
 */
public boolean sendAsyncMessage(MqSendEvent mqSendEvent);

}
public class MqSendEvent implements Serializable {
//topic
private String topic;
//数据
private String data;
//traceId数据
private String traceId;

public MqSendEvent(String topic, String data) {
    this.topic = topic;
    this.data = data;
}

public String getTopic() {
    return topic;
}

public void setTopic(String topic) {
    this.topic = topic;
}

public String getData() {
    return data;
}

public void setData(String data) {
    this.data = data;
}

public String getTraceId() {
    return traceId;
}

public void setTraceId(String traceId) {
    this.traceId = traceId;
}

}
@Component
@Slf4j
public class UserRegisterListener extends MessageBaseListener {

/**
 * 监听消息
 *
 * @param consumerRecord
 * @param ack
 */
@Override
@KafkaListener(topics = {MessageConstants.REGISTER_USER_TOPIC}, containerFactory = MessageConstants.CONSUMER_CONTAINER_FACTORY_NAME)
protected void receiverMessage(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
    try {
        log.info("consumer message record:{}", consumerRecord);
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object value = optional.get();
            MqSendEvent mqSendEvent = JSON.parseObject((String) value, MqSendEvent.class);
            JSONObject jsonObject = JSON.parseObject(mqSendEvent.getData());
            MDC.put(ElkConstants.TRACE_ID, mqSendEvent.getTraceId());
         //具体业务逻辑
        }
    } finally {
        MDC.clear();
    }
    ack.acknowledge();
}

}
5,多线程方式
思路
在定时任务的时候生成一个全局唯一的traceId,放入MDC中,这个traceId伴随着这整个请求的调用周期
实现

三,MDC实现链路追踪的原理

1,概述
MDC(Mapped Diagnostic Contexts),是Slf4J类日志系统中实现分布式多线程日志数据传递的重要工具,用户可利用MDC将一些运行时的上下文数据打印出来。目前只有log4j和logback提供原生的MDC支持
2,使用(具体不同场景使用见(二,请求源头))
在logback配置文件中配置MDC容器中的变量%X{trackId}


%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %X{trackId} [%15.15t] %class{39}.%method[%L] : %m%n
UTF-8f


3,源码分析
MDC中的put方法
public static void put(String key, String val) throws IllegalArgumentException {
if (key == null) {
throw new IllegalArgumentException(“key parameter cannot be null”);
} else if (mdcAdapter == null) {
throw new IllegalStateException(“MDCAdapter cannot be null. See also http://www.slf4j.org/codes.html#null_MDCA”);
} else {
mdcAdapter.put(key, val);
}
}
MDCAdapter接口
public interface MDCAdapter {
//设置当前线程MDC上下文中指定key的值和value的值
void put(String var1, String var2);
// 获取当前线程MDC上下文中指定key的值
String get(String var1);
// 移除当前线程MDC上下文中指定key的值
void remove(String var1);
// 清空MDC上下文
void clear();
// 获取MDC上下文
Map<String, String> getCopyOfContextMap();
// 设置MDC上下文
void setContextMap(Map<String, String> var1);
}
LogbackMDCAdapter实现

public class LogbackMDCAdapter implements MDCAdapter {
final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal();
private static final int WRITE_OPERATION = 1;
private static final int MAP_COPY_OPERATION = 2;
//threadlocal线程级别的,这就是为什么需要remove或者clear的原因所在,
//防止内存泄露,具体为啥,可以研究一下ThreadLocal底层原理就明白了
final ThreadLocal lastOperation = new ThreadLocal();

public LogbackMDCAdapter() {
}
......

可以看到LogbackMDCAdapter声明了类型为ThreadLocal的map。ThreadLocal 提供了线程本地的实例。ThreadLocal变量在线程之间隔离而在方法或类间能够共享,是属于线程单独私有的,线程之间相互隔离,到这里就可以大致理解为其实使用的就是ThreadLocal的私有线程的特性,大概就可以明白其中的原理了。
LogbackMDCAdapter 的put方法
public void put(String key, String val) throws IllegalArgumentException {
if (key == null) {
throw new IllegalArgumentException(“key cannot be null”);
} else {
//复制当前线程的threadLocal
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
//设置当前操作为写操作,并返回上一次的操作
Integer lastOp = this.getAndSetLastOperation(1);
//上一次不是读操作或者null,已经初始化了,有内容的话在里面设置内容
if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {
oldMap.put(key, val);
} else {
// 复制一个当前线程的副本
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.put(key, val);
}

    }
}

private Integer getAndSetLastOperation(int op) {
    //设置写操作,并返回上一次操作
    Integer lastOp = (Integer)this.lastOperation.get();
    this.lastOperation.set(op);
    return lastOp;
}

private boolean wasLastOpReadOrNull(Integer lastOp) {
    //判断操作类型 是null或者读操作
    return lastOp == null || lastOp == 2;
}

创建线程安全的map放到threadLocal中
private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
if (oldMap != null) {
synchronized(oldMap) {
newMap.putAll(oldMap);
}
}

    this.copyOnThreadLocal.set(newMap);
    return newMap;
}

get方法
public String get(String key) {
//从当前线程获取map
Map<String, String> map = (Map)this.copyOnThreadLocal.get();
return map != null && key != null ? (String)map.get(key) : null;
}
remove方法
public void remove(String key) {
if (key != null) {
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
if (oldMap != null) {
//设置为写操作
Integer lastOp = this.getAndSetLastOperation(1);
if (this.wasLastOpReadOrNull(lastOp)) {
//读操作或者null的时候,复制新map并移除当前key
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.remove(key);
} else {
oldMap.remove(key);
}

        }
    }
}

clear方法
public void clear() {
//设置为写操作
this.lastOperation.set(1);
//移除复制
this.copyOnThreadLocal.remove();
}

四,MDC使用总结

mdc就是基于Threadlocal进行的一个流程周转的标志物的传递,就是根据这种标志,可以追踪到日志的整体请求记录,便于进行定位到问题所在,而且对于用户的影响极小

有关链路日志中追踪traceId的更多相关文章

  1. ruby - Sinatra 中的全局救援和日志记录异常 - 2

    如何在出现异常时指定全局救援,如果您将Sinatra用于API或应用程序,您将如何处理日志记录? 最佳答案 404可以在not_found方法的帮助下处理,例如:not_founddo'Sitedoesnotexist.'end500s可以通过调用带有block的错误方法来处理,例如:errordo"Applicationerror.Plstrylater."end错误的详细信息可以通过request.env中的sinatra.error访问,如下所示:errordo'Anerroroccured:'+request.env['si

  2. ruby-on-rails - 使用 Ruby 标准 Logger 每天只创建一个日志 - 2

    我正在使用ruby​​标准记录器,我想要每天轮换一次,所以在我的代码中我有:Logger.new("#{$ROOT_PATH}/log/errors.log",'daily')它运行完美,但它创建了两个文件errors.log.20130217和errors.log.20130217.1。如何强制它每天只创建一个文件? 最佳答案 您的代码对于长时间运行的应用程序是正确的。发生的事情是您在给定的一天多次运行代码。第一次运行时,Ruby会创建一个日志文件“errors.log”。当日期改变时,Ruby将文件重命名为“errors.log

  3. ruby - Cucumber/Savon 省略或删除日志输出 - 2

    在运行Cucumber测试时,我得到(除了测试结果)大量调试/日志相关的输出形式:D,[2013-03-06T12:21:38.911829#49031]DEBUG--:SOAPrequest:D,[2013-03-06T12:21:38.911919#49031]DEBUG--:Pragma:no-cache,SOAPAction:"",Content-Type:text/xml;charset=UTF-8,Content-Length:1592W,[2013-03-06T12:21:38.912360#49031]WARN--:HTTPIexecutesHTTPPOSTusingt

  4. ruby-on-rails - faraday如何设置日志级别 - 2

    我最近将我的http客户端切换到faraday,一切都按预期工作。我有以下代码来创建连接:@connection=Faraday.new(:url=>base_url)do|faraday|faraday.useCustim::Middlewarefaraday.request:url_encoded#form-encodePOSTparamsfaraday.request:jsonfaraday.response:json,:content_type=>/\bjson$/faraday.response:loggerfaraday.adapterFaraday.default_ada

  5. 网站日志分析软件--让网站日志分析工作变得更简单 - 2

    网站的日志分析,是seo优化不可忽视的一门功课,但网站越大,每天产生的日志就越大,大站一天都可以产生几个G的网站日志,如果光靠肉眼去分析,那可能看到猴年马月都看不完,因此借助网站日志分析工具去分析网站日志,那将会使网站日志分析工作变得更简单。下面推荐两款网站日志分析软件。第一款:逆火网站日志分析器逆火网站日志分析器是一款功能全面的网站服务器日志分析软件。通过分析网站的日志文件,不仅能够精准的知道网站的访问量、网站的访问来源,网站的广告点击,访客的地区统计,搜索引擎关键字查询等,还能够一次性分析多个网站的日志文件,让你轻松管理网站。逆火网站日志分析器下载地址:https://pan.baidu.

  6. ruby - 如何更改 Sinatra 中的日志级别 - 2

    我正在使用此代码在我的Sinatra应用程序中启用日志记录:log_file=File.new('my_log_file.log',"a")$stdout.reopen(log_file)$stderr.reopen(log_file)$stdout.sync=true$stderr.sync=true实际的日志记录是使用:logger.debug("Startingcall.Params=#{params.inspect}")事实证明,只有INFO或更高级别的日志消息被记录,而DEBUG消息没有被记录。我正在寻找一种将日志级别设置为DEBUG的方法。 最佳

  7. ruby - 带有 grep 远程日志文件的 tail - 2

    我有这段代码来跟踪远程日志文件:defdo_tail(session,file)session.open_channeldo|channel|channel.on_datado|ch,data|puts"[#{file}]->#{data}"endchannel.exec"tail-f#{file}"endNet::SSH.start("host","user",:password=>"passwd")do|session|do_tailsession,"/path_to_log/file.log"session.loop我只想在file.log中检索带有ERROR字符串的行,我正在尝

  8. Ruby 守护进程日志轮换 - 2

    当我为Daemons(1.1.0)gem设置日志记录参数时,我将如何实现与此行类似的行为?logger=Logger.new('foo.log',10,1024000)守护进程选项:options={:ARGV=>['start'],:dir_mode=>:normal,:dir=>log_dir,:multiple=>false,:ontop=>false:mode=>:exec,:backtrace=>true,:log_output=>true} 最佳答案 不幸的是,Daemonsgem不使用Logger。它将STDOUT和S

  9. ruby-on-rails - 在 Rails 应用程序的前端获取实时日志 - 2

    在Rails3.x应用程序中,我正在使用net::ssh并向远程pc运行一些命令。我想向用户的浏览器显示实时日志。比如,如果两个命令在net中运行::ssh执行即echo"Hello",echo"Bye"被传递然后"Hello"应该在执行后立即显示在浏览器中。这是代码我在ruby​​onrails应用程序中使用ssh连接和运行命令Net::SSH.start(@servers['local'],@machine_name,:password=>@machine_pwd,:timeout=>30)do|ssh|ssh.open_channeldo|channel|channel.requ

  10. ruby - gem 应该在哪里存储日志文件? - 2

    我正在构建一个应该输出日志文件的ruby​​gem。将日志文件存储在哪里是一个好习惯?我正在从我正在构建的Rails网站中提取此功能,我可以在那里简单地登录到log/目录。 最佳答案 理想情况下,使路径可配置(.rc文件、交换机、rails/rack配置等)。如果它是一个Rack中间件,添加在构造函数的参数中指定它的可能性。如果没有提供日志路径,回退到检测日志目录。(我依稀记得它是Rails中的config.paths['log'],但如果可以的话,请确保config在你的gem中使用之前确实指向某些东西在Rails之外使用。)如果

随机推荐