赞
踩
生成测试数据
@Slf4j public final class Demo{ static final String ALPHA = "abcdefghijklmnopqrstuvwxyz"; public static void main(String[] args){ int length = ALPHA.length(); int count = 200; // 每个字母添加200个 List<String> list = new ArrayList<>(length*count); for(int i=0;i<length;i++){ char ch = ALPHA.charAt(i); for (int j = 0; j < count; j++) { list.add(String.valueOf(ch)); } } // 打乱 Collections.shuffle(list); for (int i = 0; i < 26; i++) { try(PrintWriter printWriter = new PrintWriter( new OutputStreamWriter( new FileOutputStream("d://tmp/"+(i+1)+".txt")))){ String collect = list.subList(i*count,(i+1)*count).stream().collect(Collectors.joining()); printWriter.print(collect); }catch (FileNotFoundException e){ e.printStackTrace(); } } } }
字母计数实现
@Slf4j public final class Demo{ static final String ALPHA = "abcdefghijklmnopqrstuvwxyz"; public static void main(String[] args) throws BrokenBarrierException, InterruptedException { // 使用累加器作为值,方便统计时的累加操作 ConcurrentHashMap<Character, LongAdder> map = new ConcurrentHashMap<>(26); // 创建一个核心线程数和最大线程数相等的线程池,阻塞队列为SynchronousQueue大小为0 ThreadPoolExecutor pool = new ThreadPoolExecutor(26,26,0, TimeUnit.MILLISECONDS, new SynchronousQueue<>()); CyclicBarrier barrier = new CyclicBarrier(27); // 提交26个读取任务 for (int i = 0; i < 26; i++) { int t = i; pool.submit(()->{ try(FileReader reader = new FileReader("d://tmp/"+(t+1)+".txt")){ int c = 0; while((c=reader.read())!=-1){ // computeIfAbsent可以原子的判断键是否存在,如果不存在则创建;因为判断是否存在和创建是两步操作,所以需要使用该方法保证原子性 LongAdder adder = map.computeIfAbsent((char)c,key->new LongAdder()); // 得到创建好的累加器,进行累加,每一个提交到累加器上的操作是原子的 adder.increment(); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } try { barrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }); } // 等待所有线程执行结束 barrier.await(); printMap(map); } private static <K,V> void printMap(Map<K,V> map){ for(K key:map.keySet()){ System.out.println(key+":"+map.get(key)); } } }
注意
ConcurrentHashMap线程安全的含义是方法是原子性的,但是上述操作中如果使用Integer作为值,先获取Integer再累加,或是先判断Integer是否存在再创建这都是在方法层面以外的两个操作,虽然每个操作都具有原子性,但是放在一起不具有原子性了
1. JDK1.7HashMap死链
由于JDK7中,链表使用的是头插法,在扩容时会导致原有的链表顺序被逆序;且JDK7中的扩容后的rehash操作是直接在原数组上进行的,当两个线程同时扩容时,假设链表:a-b-c-null,线程1在rehash时会将尾部结点插在头部,变为:c-a-b-null,假设本来此时线程2已经到了c,接下来到null就结束循环,但由于线程1的影响导致c.next变为a所以会继续循环,同理线程1也会收到线程2的影响进入死循环
2. JDK1.8解决死链问题
JDK1.8中增加元素改为了尾插法,不会改变顺序,而且扩容后会创建一个新的数组并且将原数组的数据通过rehash将低位的结点放在新数组的原位置,高位的结点往后移动原容量的距离,解决了死链问题
3. JDK1.8数据覆盖问题
假设当前HashMap中某个容器中没有元素,此时插入数据不会遇到哈希碰撞则会直接插入元素,假设线程1在经过哈希碰撞判断后时间片用尽被挂起,此时线程2也经过哈希碰撞的判断将数据插入了容器,这时线程1醒来后也会将数据直接插入而不是接在线程2插入的后面(此时已经存在了哈希碰撞但是线程1不察觉),导致线程2的数据被覆盖
4. JDK1.8size不一致问题
因为size没有被volatile修饰,所以工作内存中size的变化是互相不可见的,假设线程1和线程2都读取到size为1,当线程1将size增加为2后线程2未知,也会将size增加为2这样size就丢失了1的大小
重要属性和内部类
// 当初始化或扩容完成后,为下一次扩容的阈值大小 // 默认为0,初始化时为-1;扩容时为-(1+扩容线程数) private transient volatile int sizeCtl; // 内部的结点类,整个ConcurrentHashMap就是一个Node[] static class Node<K,V> implements Map.Entry<K,V>{} // 哈希表容器数组 transient volatile Node<K,V>[] table; // 扩容时的新数组 private transient volatile Node<K,V>[] nextTable; // 在扩容时,会在原数组从后往前遍历,每遍历完一个容器就在容器中加上一个ForwardingNode表示该容器上的结点已经转移到新数组中,此时其他线程获取结点时就知道该去新数组中获取 static final class ForwardingNode<K,V> extends Node<K,V>{} // 用在compute以及computeIfAbsent进行计算时,用来占位,计算完成后替换为普通的Node static final class ReservationNode<K,V> extends Node<K,V>{} // 在map从链表升级为红黑树时,原数组的头结点会用TreeBin代替 static final class TreeBin<K,V> extends Node<K,V>{} // 在map从链表升级为红黑树时,结点将会转化成TreeNode static final class TreeNode<K,V> extends Node<K,V>{}
重要方法
// 获取Node[]中第i个Node,即第i个容器的头结点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab,int i)
// 通过cas修改Node[]中第i个Node的值,c为旧值,v为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab,int i,Node<K,C> c,Node<K,V> v)
//直接求改Node[]中第i个Node的值,v为新值
static final <K,V> void setTabAt(Node<K,V>[] tab,int i,Node<K,V> v)
构造器分析
实现了懒惰初始化,在构造器中仅仅计算了table的大小,并没有初始化table,在第一次使用的时候才会真正的创建
/** * @param initialCapacity 初始容量 * @param loadFactor 装载因子 * @param concurrencyLevel 并发度 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel){ // 参数校验,装载因子和初始容量不能小于0,并发度必须大于0 if(!(loadFactor> 0.0f)|| initialCapacity<0||concurrencyLevel<=0) throw new IllegalArgumentException(); // 初始容量如果小于并发度,则直接改为并发度,减少竞争 if(initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; // 通过装载因子计算大小 long size = (long)(1.0+(long)initialCapacity/loadFactor); // tableSizeFor 是为了保证计算的大小是2^n,即16,32,64等等;所以ConcurrentHashMap的大小并不一定是创建时指定的大小 int cap = (size >= (long)MAXIMUM_CAPACITY)? MAXIMUM_CAPACITY : tableSizeFor((int)size); // 将计算出来的大小赋值给sizeCtl,第一次使用时用这个值来初始化数组 this.sizeCtl = cap; }
get流程
public V get(Object key) { Node<K,V>[] tab;Node<K,V> e,p;int n,eh;K ek; // spread方法能确保hashCode为正整数 int h = spread(key,hashCode()); // 这两步判断table是否非空且有数据,否则直接返回null if((tab==table) != null && (n==tab.length) > 0 && // 在初始化已经保证了table长度为2^n,除数满足是2^n时,按位与可以代替取模,同时按位与效率更高 // 这一步是为了找到目标容器并判断,通过哈希计算(取模/按位与)找到对应的数组下标,如果该容器为空,也直接返回null (e=tabAt(tab, (n-1)&h)) != null) { // 先比较头结点的哈希码,如果头结点哈希码和目标结点的哈希码相同,再进一步判断结点的键是否相同(先比较哈希码——>再使用equals比较key),都相同表示头结点就是目标结点,直接返回值 if(eh = e.hash) == h) { if((ek = e.key) == key || (ek!=null&&key.equals(ek))) return e.val; }else if(eh<0) // 如果头结点的哈希码是负数,则表示该节点是ForwordingNode或者TreeBin,则需要通过find方法来获取 return (p=e.find(h,key)) !=null?p.val:null; // 向下遍历结点,找到一个满足哈希码相同,key的equals为true的结点,返回 while((e = e.next)!=null){ if(e.hash == h && ((ek = e.key) == key|| (ek !=null&&key.equals(ek)))) return e.val; } } return null; }
put流程
public V put(K key,V value){ return putVal(key,value,false); } // onlyIfAbsent如果为true则表示只有当前结点不存在时才会添加数据,如果已经存在则不会添加 final V putVal(K key,V value,boolean onlyIfAbsent){ // 键值不许为null,HashMap允许键值为null(Hashtable不允许键值为null) if(key==null||value==null) throw new NullPointerException(); // 保证key的哈希码为正整数 int hash = spread(key.hashCode()); int binCount = 0; // 进入死循环 for(Node<K,V>[] tab = table;;){ Node<K,V> f;int n,i,fh; // 如果哈希表为空或者长度为0,则调用initTable通过cas的方式初始化哈希表 if(tab==null||(n=tab.length)==0) tab = initTable(); // 通过哈希计算找到目标容器,如果目标容器头结点为null,则表示容器中还没有结点,则直接使用cas来修改目标容器的头结点,如果cas成功则表示插入成功,退出循环;如果此时有另一个线程提前初始化了头结点,则进入下一轮循环,重新插入 else if((f = tabAt(tab,i=(n-1)&hash)) == null){ if(casTabAt(tab,i,null,new Node<K,V>(hash,key,value,null))) break; } // 如果哈希码为MOVED(-1)表示正在扩容,则会帮助扩容线程进行扩容 else if((fh = f.hash)==MOVED) tab = helpTransfer(tab,f); // 进入这个分支表示桶下标冲突了,此时需要使用到synchronized独占锁 else { V oldVal = null; // 锁住链表的头结点,即锁住当前容器 synchronized(f){ // 再次确认头结点没有被移动 if(tabAt(tab,i) == f){ // 头结点哈希码大于0,表示当前还是链表,而不是红黑树 if(fh >= 0){ binCount = 1; // 开始遍历链表找到具有相同key的结点 for(Node<k,V> e = f;;++binCount){ K ek; // 比较哈希码与equals,取出旧值 if(e.hash == hash &&((ek=e.key)==key||(ek!=null&&key.equals(ek)))){ oldVal = e.val; // 如果传入的onlyIfAbsent为false,即结点不存在也会插入数据,则将结点的值改为新值 if(!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; // 如果已经遍历遍历到最后一个结点,则将新的结点添加到最后 if((e = e.next)==null){ pred.next = nwe Node<K,V>(hash,key,value,null); break; } } } // 如果头结点的哈希码小于0,表示当前容器中已经升级为红黑树 else if(f instanceof TreeBin){ Node<K,V> p; binCount = 2; // putTreeVal方法,会看key是否已经在书中,如果在则返回对应的TreeNode if((p=((TreeBin<K,V> f).putTreeVal(hash,key,value))!=null){ oldVal = p.val; // 取得树中的结点后,将新制插入 if(!onlyIfAbsent) p.val = value; } } } } if(binCount !=0 ){ // binCount在遍历链表的时候,会统计链表长度,如果链表长度达到了树化阈值(8),则调用treeifyBin,内部会先判断当哈希表长度达到64后,如果链表长度还是达到了这个阈值,则会将链表转换为红黑树 if(binCount >= TREEIFY_THREASHOLD) treeifyBin(tab,i); if(oldVal!=null) return oldVal; break; } } } // 增加size的计数 addCount(1L,binCount); return null; } private final Node<K,V> initTable(){ Node<K,V>[] tab;int src; // 如果哈希表为null或者长度为0,表示哈希表还没有创建,则会循环不断尝试创建 while((tab = table)==null || tab.length==0){ // 如果sizeCtl小于0,表示已经有线程在扩容了,则调用yield方法让出cpu资源 if((sc = sizeCtl) < 0) Thread.yield(); // 尝试通过cas将sizeCtl设置为-1,这个sizeCtl充当了一个cas锁的标志,只有一个初始化线程能够修改成功 else if(U.compareAndSwapInt(this,SIZECTL,sc,-1)){ try{ // 再次判断哈希表是否被初始化过 if((tab=table)==null||tab.length==0){ // 如果sizeCtl中存储的构造时计算出的大小不大于0,则使用默认大小 int n = (sc>0)?sc:DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; // 再次计算sc,表示下次要扩容时的阈值 sc = n-(n>>>2); } }finally{ // 将计算出来的阈值赋值给sizeCtl sizeCtl = sc; } break; } } return tab; } /** * addCount方法类似LongAddr的累加,会创建多个累加单元进行累加,最后汇总 */ // check是之前binCount的个数 private final void addCount(long x,int check) { CounterCell[] as;long b,s; if( // 如果已经有了counterCells则向cell累加 (as=counterCells)!=null|| // 如果还没有创建counterCells,向baseCount累加 !U.compareAndSwapLong(this,BASECOUNT,b=baseCount,s=b+x) ){ CounterCell a;long v;int m; boolean uncontended = true; if( // 还没有counterCells as == null || (m=as.length-1) < 0|| // 还没有cell (a = as[ThreadLocalRandom.getProbe() % m]) == null || // 使用cas进行累加操作,如果失败了表示冲突,则创建新的累加单元 !(uncontended = U.compareAndSwapLong(a,CELLVALUE,v=a.value,v+x)) ){ // 创建累加单元数组和cell,累加重试 fullAddCount(x,uncontended); return; } if(check<=1) return; // 获取元素个数 s = sumCount(); } if(check>=0){ Node<K,V>[] tab,nt;int n,sc; // 如果元素个数大于阈值,且表不为空,且数组长度没有超过最大值,表示需要扩容 // 循环的扩容,如果一次扩容失败或者不能达到预期,则会再次扩容 while(s>=(long)(sc=sizeCtl)&&(tab=table)!=null && (n = tab.length)<MAXIMUM_CAPATICY){ int rs = resizeStamp(n); if(sc<0){ if((sc>>>RESIZE_STAMP_SHIFT!=rs||sc==rs+1|| sc==rs+MAX_RESIZERS||(nt=nextTable)==null|| transferIndex<=0) break; // 如果newtable已经创建了,则帮忙扩容 if(U.compareAndSwapInt(this,SIZECTL,sc,sc+1)) transfer(tab,nt); } // newtable还没有创建 else if(U.compareAndSwapInt(this,SIZECEL,sc,(rs<<RESIZE_STAMP_SHIFT)+2)) transfer(tab,null); s = sumCount(); } } }
size计算流程
size计算实际发生在put,remove改变集合元素的操作之中;size计算之后只能得到一个大概值,得不到一个精确值
public int size(){ long n = sumCount(); return ((n<0L)?0: (n>(long)Integer.MAX_VALUE)?Integer.MAX_VALUE: (int)n); } final long sumCount(){ CounterCell[] as = counterCells;CounterCell a; // 将baseCount计数与所有cell计数累加 long sum = baseCount; if(as!=null){ for(int i=0;i<as.length;++i){ if((a=as[i])!=null) sum += a.value; } } return sum; }
transfer扩容流程
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // 如果nextTab为null,则初始化nextTab if (nextTab == null) { try { @SuppressWarnings("unchecked") // 在原有的table基础上长度×2 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // 以链表为单位,旧数组往新数组的搬迁 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; } } // 如果链表头为null,将链表头替换成forwardingNode else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); // 如果链表头已经是forwardingNode,则进入下一轮循环处理下一个链表 else if ((fh = f.hash) == MOVED) advance = true; // 处理链表 else { // 锁住链表头 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // 头结点的哈希码>=0表示是普通的链表结点 if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } // 红黑树结点 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
在jdk7中,维护了一个segment数组,每个segment对应一把锁,Segment继承自ReentrantLock
构造器分析
public ConcurrentHashMap(int initialCapacity,float loadFactor,int concurrencyLevel) { // 参数有效性分析 if(!(loadFactor>0) || initailCapacity < 0||concurrency <= 0) throw new IllegalArgumentException(); int sshift = 0; int ssize = 1; // 保证ssize达到并发度并且为2^n while(ssize<concurrencyLevel) { ++sshift; ssize<<=1; } // segmentShift默认是32-4=28 this.segmentShift = 32-sshift; // segmentMask默认是15,即0000 0000 0000 1111 this.segmentMask = ssize-1; if(initialCapacity > MAXIMUM_CAPACITY) initailCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if(c*ssize<initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while(cap<c) cap<<=1; // 创建segments and segments[0] // 一个segment对应一个哈希表 Segment<K,V> s0 = new Segment<K,V>(loadFactor,(int)(cap*loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss,SBASE,s0); this.segments = ss; }
segment定位
put流程
public V put(K key,V value) { Segment<K,V> s; if(value==null) throw new NullPointerException(); int hash = hash(key); // 计算segment的下标 int j = (hash>>>segmentShifft)&segmentMask; // 获去segment对象,判断是否为null,是则创建该segment,只有segment数组和segments[0]不是懒惰初始化,其他segment还是懒惰初始化的 if((s=(Segment<K,V> UNSAFE.getObject (segments,(j<<SSHIFT)+SBASE)) == NULL){ // 这时不能确定是否真的为null,因为可能其他线程在同时创建了segment数组 // 因此再ensureSegment,用cas方式保证segment的安全性 s = ensureSegment(j); } // 到这里已经得到了一个非空的segment对象 // 进入segment的put流程 return s.put(key,hash,value,false); } // 这是Segment类中的put方法 final V put(K key,int hash,V value,boolean onlyIfAbsent){ // 尝试加锁 HashEntry<K,V> node = tryLock()?null: // 如果加锁不成功,进入scanAndLockforPut流程 // 如果时多核cpu对多tryLock64次,进入lock流程(一次加锁没有成功则阻塞) // 再尝试期间,还可以顺便查看该结点在链表中有没有,如果没有则创建出来 scanAndLockForPut(key,hash,value); // 到这一步segment已经加锁完成,可以安全执行 V oldValue; try{ HashEntry<K,V>[] tab = table; // 经过哈希计算,获取到哈希表中的下标 int index = (tab.length-1) & hash; // 根据下标得到链表的头结点 HashEntry<K,V> first = entryAt(tab,index); // 进入循环 for(HashEntry<K,V> e = first;;){ // 结点非空的情况,判断目标结点是否已存在 if(e != null){ K k; // 头结点的key和目标key相同,更新头结点的值 if((k = e.key) == key|| (e.hash == hash && key.equqls(k))){ oldValue = e.value; if(!onlyIfAbsent){ e.value = value; ++modCount; } break; } // 结点后裔 e = e.next; } // 目标结点不存在,新增结点 else { // 如果等待锁的时候已经创建过node,将node的next指向链表头(头插法) if(node != null) node.setNext(first); // 如果获取锁很顺利,没有创建node,则创建一个node,将next指向链表头 else node = new HashEntry<K,V>(hash,key,value,first); int c = count + 1; // 如果结点数量超过阈值,进行扩容 if (c > threashold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 如果节点数量没有超过阈值,将新的node作为链表头 else setEntryAt(tab,index,node); ++modCount; count = c; oldValue = null; break; } } }finally{ unlock(); } return oldValue; }
rehash流程(扩容)
private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; // 新的容量为旧容量的两倍 int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; // 阈值更新为新容量*装载因子 threshold = (int) (newCapacity * loadactor); // 创建新的哈希表 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; // 遍历旧哈希表的结点,搬迁到新的哈希表中 for (int i=0;i<oldCapacity;i++) { HashEntry<K,V> e = oldTable[i]; if(e != null) { HashEntry<K,V> next = e.next; // 通过哈希算法获得结点在新链表中的位置 int idx = e.hash & sizeMask; // next为空表示当前容器链表中只有一个结点,则将结点直接放入新链表中即可 if (next == null) newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; // 过一遍链表,尽可能把rehash后idx不变的结点重用 for(HashEntry<K,V> last = next; last != null; last = last.next){ int k = last.hash & sizeMask; // 如果新的idx和旧的idx不一致,记录新的结点的位置和结点 if(k != lastIdx) { lastIdx = k; lastRun = last; } } // 将旧结点放在新的位置上 newTable[lastIdx] = lastRun; // 剩余结点需要新建 for(HashEntry<K,V> p = e;p != lastRun;p = p.next){ V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h,p.key,v,n); } } } } // 扩容完成,加入新节点 int nodeIndex = node.hash & sizeMask; node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; // 将旧的哈希表替换为新的哈希表 table = newTable; }
get流程
get操作并未加锁,用了UNSAFE方法保证了可见性,扩容过程中,get先发生就从旧表中获取内容,get后发生就从新表中获取内容
public V get(Object key) { Setment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key); // u为segment对象在数组中的偏移量 long u = (((h >>> segmentShift) & segmentMask)<<SSHIFT)+BASE; // s即定位到的segment if(( s = (Segment<K,V> UNSAFE.getObjectVolatile(segments,u)) != null && (tab = s.table != null) { // 定位到目标数组,并遍历,找到对应的键 for(HashEntry<K,V> e = (HashEntry<K,V> UNSAFE.getObjectVolatile (tab,((long)(((tab.length-1)&h)) << TSHIFT)+TBASE)); e != null; e = e.next) { K key; if((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
size流程
public int size(){ final Segment<K,V>[] segments = this.segments; int size; // size是否溢出 boolean overflow; // modCount的总和 long sum; // 上一次的结果 long last = 0L; // 重试的次数 int retries = -1; try{ for(;;){ // 如果超过重试次数,需要创建所有的segment并加锁 if(retries++ == RETRIES_BEFORE_LOCK){ for(int j = 0;j <segments.length;++j) ensureSegment(j).lock(); } sum = 0L; size = 0; overflow = false; for(int j=0;j<segments.length;++j){ Segment<K,V> seg = segmentAt(segments,j); if(seg != null) { // segment中修改的次数 sum += seg.modCount; // segment中元素的个数 int c = seg.count; // 小于0表示溢出了 if(c < 0 || (size += c) < 0) overflow = true; } } // 如果两次的结果一样,则退出循环 if(sum == last) break; last = sum; } }finally { // 如果发现重试次数超过了加锁阈值,表示加过锁了,则对segment进行解锁 if(retries > RETRIES_BEFORE_LOCK){ for(int j=0;j<segments.length;++j) segmentAt(segments,j).unlock(); } } return overflow?Integer.MAX_VALUE:size; }
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>,java.io.Serializable{
// 队列内部维护的结点
static class Node<E>{
E item;
/**
* 真正的后继节点
* 发生在出队时指向自己
* null,没有后继结点
* /
Node<E> next;
Node(E x) { item = x; }
}
}
初始化链表
last = head = new Node<E>(null)
,使用一个Dummy结点来占位,item为null
入队
一个新结点入队,让last指向新节点last = last.next = node
出队
Node<E> h = head;
Node<E> first = h.next;
// 断开头结点,即Dummy结点
h.next = h;
// 新的头结点指向
head = first;
// 获得需要返回的结点的值
E x = first.item;
// 将新的头结点设置为dummy结点
first.item = null;
return x;
用了两把锁和Dummy结点:
线程安全分析
public void put(E e) throws InterruptedException { if(e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); // put相关的锁 final ReentrantLock putLock = this.putLock; // count用来维护元素计数 final AtomicInteger count = this.count; putLock.lockInterruptibly(); try{ // 如果队列满了,则notFull条件进入等待 while(count.get()==capacity){ notFull.await(); } // 被唤醒后,入队且计数加一 enqueue(node); c = count.getAndIncrement(); // 如果还有空位,则唤醒其他线程 if(c+1<capacity) notFull.signal(); }finally{ putLock.unlock(); } // 如果队列中有元素,叫醒take线程 if(c == 0) // 为了减少竞争,这里调用的是notEmpty.signal()而不是notEmpty.signallAll() signalNotEmpty(); }
ConcurrentLinkedQueue的设计与LinkedBlokingQueue非常像,也是用两把锁来锁住头尾,但是锁用cas来实现
底层采用写入时拷贝的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,不yi
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。