publicclassThreadLocalDemo{publicstaticvoidmain(String[]args){Service2service2=newService2();Service1service1=newService1(service2);// 两个线程模拟两个请求Threadrequest1=newThread(service1::service1);Threadrequest2=newThread(service1::service1);request1.start();request2.start();}}/**
* 全局上下文,管理TRACE_ID
*/classTraceContext{// API文档中推荐ThreadLocal声明为private staticprivatestaticfinalThreadLocal<String>CONTEXT=newThreadLocal<>();publicstaticvoidset(StringtraceId){CONTEXT.set(traceId);}publicstaticStringget(){returnCONTEXT.get();}publicstaticvoidremove(){CONTEXT.remove();}}/**
* 负责创建TRACE_ID,调用Service2
*/classService1{privatestaticfinalStringTRACE_ID_PREFIX="X-TRACE-ID-";privatestaticfinalAtomicIntegerID=newAtomicInteger(1);privateService2service2;publicService1(Service2service2){this.service2=service2;}publicvoidservice1(){// 在service1中先设置TRACE_IDTraceContext.set(TRACE_ID_PREFIX+ID.getAndIncrement());// 打印新生成的TRACE_ID,模拟一些业务操作System.out.println("generate new TRACE_ID: "+TraceContext.get()+" -> do something in service1, current thread is: "+Thread.currentThread());// 再调用service2service2.service2();}}/**
* 使用TRACE_ID,并完成清理工作
*/classService2{publicvoidservice2(){// 打印TRACE_ID, 模拟对TRACE_ID的操作System.out.println("current TRACE_ID is: "+TraceContext.get()+" -> do something in service2, current thread is: "+Thread.currentThread());// 操作完成后,手动清理TraceContext.remove();}}
在解释具体代码之前,首先要搞清楚 ThreadLocal、Thread 及 ThreadLocal 中实际保存的 value 的关系,下面是 ThreadLocal 源码中的一段注释:
1
2
3
4
5
6
7
8
9
10
11
/**
* ThreadLocals rely on per-thread linear-probe hash maps attached
* to each thread (Thread.threadLocals and
* inheritableThreadLocals). The ThreadLocal objects act as keys,
* searched via threadLocalHashCode. This is a custom hash code
* (useful only within ThreadLocalMaps) that eliminates collisions
* in the common case where consecutively constructed ThreadLocals
* are used by the same threads, while remaining well-behaved in
* less common cases.
*/privatefinalintthreadLocalHashCode=nextHashCode();
// .../* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */ThreadLocal.ThreadLocalMapthreadLocals=null;/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/ThreadLocal.ThreadLocalMapinheritableThreadLocals=null;// ...
privatefinalintthreadLocalHashCode=nextHashCode();/**
* The next hash code to be given out. Updated atomically. Starts at
* zero.
*/privatestaticAtomicIntegernextHashCode=newAtomicInteger();/**
* The difference between successively generated hash codes - turns
* implicit sequential thread-local IDs into near-optimally spread
* multiplicative hash values for power-of-two-sized tables.
*/privatestaticfinalintHASH_INCREMENT=0x61c88647;/**
* Returns the next hash code.
*/privatestaticintnextHashCode(){returnnextHashCode.getAndAdd(HASH_INCREMENT);}
/**
*...
* To help deal with very large and long-lived usages, the hash table entries use
* WeakReferences for keys.
*...
*/staticclassThreadLocalMap{// .../**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/staticclassEntryextendsWeakReference<ThreadLocal<?>>{/** The value associated with this ThreadLocal. */Objectvalue;Entry(ThreadLocal<?>k,Objectv){super(k);value=v;}}// ...}
rehash 操作前,会先进行一次 cleanSomeSlots 清理操作,这个方法在源码注释中使用了 Heuristically(启发式地) 进行描述,因此这里简称它为 启发式清理。而在 rehash 方法中,在调用 resize 方法扩容前,还会调用另外一个 expungeStaleEntries 清理操作,熟悉的词汇,在源码注释中描述为 Expunge all stale entries in the table(清理所有 stale entry),它本质上是调用了 expungeStaleEntry 方法,而 expungeStaleEntry 方法是对哈希表中的 stale entry 进行部分清理,后面就简称它为 分段式清理。
两个清理工作完成后,才开始正式的 resize 扩容流程,新建一个两倍容量的数组,将旧表中的元素转移到新表,同时清理一些 stale entry。
/**
* Expunge all stale entries in the table.
*/privatevoidexpungeStaleEntries(){Entry[]tab=table;intlen=tab.length;// 遍历哈希表每个位置,对stale entry进行清理 for(intj=0;j<len;j++){Entrye=tab[j];if(e!=null&&e.get()==null)expungeStaleEntry(j);}}
/**
* Heuristically scan some cells looking for stale entries.
* This is invoked when either a new element is added, or
* another stale one has been expunged. It performs a
* logarithmic number of scans, as a balance between no
* scanning (fast but retains garbage) and a number of scans
* proportional to number of elements, that would find all
* garbage but would cause some insertions to take O(n) time.
*
* @param i a position known NOT to hold a stale entry. The
* scan starts at the element after i.
*
* @param n scan control: {@code log2(n)} cells are scanned,
* unless a stale entry is found, in which case
* {@code log2(table.length)-1} additional cells are scanned.
* When called from insertions, this parameter is the number
* of elements, but when from replaceStaleEntry, it is the
* table length. (Note: all this could be changed to be either
* more or less aggressive by weighting n instead of just
* using straight log n. But this version is simple, fast, and
* seems to work well.)
*
* @return true if any stale entries have been removed.
*/privatebooleancleanSomeSlots(inti,intn){booleanremoved=false;Entry[]tab=table;intlen=tab.length;do{i=nextIndex(i,len);Entrye=tab[i];if(e!=null&&e.get()==null){n=len;removed=true;i=expungeStaleEntry(i);}}while((n>>>=1)!=0);returnremoved;}
这里把源码中的所有注释都搬进来了,非常详细的一段注释,从设计思想到各参数的详细讲解,应有尽有。代码不长,核心循环的工作是以 i 为起点对哈希表进行扫描(注释中重点写明这个起始 i 位置一定 不是 stale entry),判断是否存在 stale entry。如果一直没扫描到,那么在扫描 $log_2 n$ 次后就结束循环,返回 false。如果扫描到存在 stale entry,那么 cleanSomeSlots 调用我们刚介绍过的 expungeStaleEntry 进行清理,i 的值将直接跳到被清理键簇的紧邻 null 位置,并且会将扫描次数扩大,进行额外的 $log_2 (table.length)-1$ 次扫描。
每次发现 stale entry,就会重新将扫描次数进行增加,哈希表中的 stale entry 越多,扫描的次数就会越多,进行的清理操作就越多,这就是一个逐步启发的过程。代码注释中说到这种方式是一种折中的实现,在完全不进行扫描和全局扫描之间找到一个平衡点。
这个方法会在两个地方被调用,第一个是在 set 方法的末尾,新增元素成功后,在 rehash 之前进行一次启发式清理,这时候传入的两个参数分别为新增元素的位置 i 及新增后所有元素的个数 sz。
privatevoidreplaceStaleEntry(ThreadLocal<?>key,Objectvalue,intstaleSlot){Entry[]tab=table;intlen=tab.length;Entrye;// Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). intslotToExpunge=staleSlot;for(inti=prevIndex(staleSlot,len);(e=tab[i])!=null;i=prevIndex(i,len))if(e.get()==null)slotToExpunge=i;// Find either the key or trailing null slot of run, whichever // occurs first for(inti=nextIndex(staleSlot,len);(e=tab[i])!=null;i=nextIndex(i,len)){ThreadLocal<?>k=e.get();// If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. if(k==key){e.value=value;tab[i]=tab[staleSlot];tab[staleSlot]=e;// Start expunge at preceding stale entry if it exists if(slotToExpunge==staleSlot)slotToExpunge=i;cleanSomeSlots(expungeStaleEntry(slotToExpunge),len);return;}// If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. if(k==null&&slotToExpunge==staleSlot)slotToExpunge=i;}// If key not found, put new entry in stale slot tab[staleSlot].value=null;tab[staleSlot]=newEntry(key,value);// If there are any other stale entries in run, expunge them if(slotToExpunge!=staleSlot)cleanSomeSlots(expungeStaleEntry(slotToExpunge),len);}
这也是个非常繁琐的方法,但是注释内容较多,理解起来也很方便。
这个方法是在 set 中被调用的,在线性探测插入(或修改)元素时,如果遇到了 stale entry,那么就进入到 replaceStaleEntry,传入的参数为元素的 key、value 以及 stale entry 的位置 i。
第二个循环过程中,如果找到了满足 k == key 条件的 Entry,那么就会进入替换及清理的代码中:
1
2
3
4
5
6
7
8
9
10
11
12
if(k==key){e.value=value;tab[i]=tab[staleSlot];tab[staleSlot]=e;// Start expunge at preceding stale entry if it exists if(slotToExpunge==staleSlot)slotToExpunge=i;cleanSomeSlots(expungeStaleEntry(slotToExpunge),len);return;}
staleSlot 是调用 replaceStaleEntry 方法时传入的参数,也就是 set 方法调用过程中发现的第一个 stale entry 的位置。这里先将当前 Entry 的 value 进行了替换修改,然后将当前位置 i 与 staleSlot 位置的元素进行了交换,交换过后,i 位置变为 stale entry,而 staleSlot 位置成为了有效 entry。
这段代码就是 replaceStaleEntry 命名的由来,它将原来 set 中识别出的 stale entry 替换为了一个新的有效 entry(key 是原来已经存在的,仅修改了 value)。下图中,K8 == K8',当 i == 4 时,进入上述逻辑中,先将 K8' 的 value 进行替换修改,再将 K5 与 K8' 进行交换,得到下面的成果。
替换成功后,随后条件判断与步骤 3 逻辑相同,都是确定 slotToExpunge 的位置,此时的 i 位置已经是 stale entry 了,因此可以作为 expungeStaleEntry分段式清理 的起点。
最后就是进行两次清理,先分段清理,再将其返回值传入 cleanSomeSlots 进行启发式清理,启发式清理中的第二个参数为 len,即哈希表当前的最大容量,区别 set 方法末尾的参数传入的 sz。
若第二个循环中没有找到能够替换的 Entry,则进入到最后的新建逻辑:
1
2
3
4
5
6
7
// If key not found, put new entry in stale slot tab[staleSlot].value=null;tab[staleSlot]=newEntry(key,value);// If there are any other stale entries in run, expunge them if(slotToExpunge!=staleSlot)cleanSomeSlots(expungeStaleEntry(slotToExpunge),len);
staleSlot 处成为新元素插入的位置,如果在第二个循环中发现了其他 stale entry,就进行两步清理工作。