赞
踩
笔者在阅读此篇扩容机制时,对Doug Lea大神佩服的五体投地,满满的精华,还没有全部读懂,真的像一杯好酒,需要满满酝酿,细细品味乐趣以及其中奥妙呀,可以说ConcurrentHashMap的最璀璨的地方就在这里。 不扯了,上源码,具体流程均在代码提现,篇幅较多,请耐心阅读。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, //桶长度 stride; //单任务可执行的处理数 //NCPU是当前环境的cpu数, 如是1时,说明是单核心,那么就一个线程处理桶的总长度, 如果桶长度<16的话,那么按照16来处理; //如是多核cpu, 用长度/8然后在除以cpu数 < 16,则用16,反则按照计算的数处理; 这样做目的:是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // 开始初始化 try { //n << 1 相当于2 * n 也就是2倍的n @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // 防止内存不够,出现OOM sizeCtl = Integer.MAX_VALUE; return; } //将生产好的2倍node数组,更新到成员变量 nextTable = nextTab; //更新转移下标, n 就是老桶的长度 transferIndex = n; } //扩容2倍新表的长度. int nextn = nextTab.length; //声明一个转移标识节点 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //推进标识,默认推进为ture boolean advance = true; //结束转移标志 boolean finishing = false; // to ensure sweep before committing nextTab //进入无限循环. i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标 for (int i = 0, bound = 0;;) { Node<K,V> f; //当前节点 int fh; //当前节点的hash //第一次进入数据转移 while (advance) { int nextIndex, //下一个获取数据的下标 nextBound; //循环一次后,还剩下区间数据量 /** * 这块是扩容重点,请仔细阅读: * 首先得一定要明确 --i 是在做什么. 代表什么含义. 其次bound,在这有充当什么角色. * * 想这种逻辑上下关联时,我的学习方法就是用数据代入, 就用 默认容量为16,阀值是12 ,现在插入13个数据为例: * 数据代入: * int n = tab.length = 16, * int stride = 16. 一个任务处理数 * Node<K,V>[] nextTable = nextTab 的长度 = 32 = 2 * n * int nextn = 32 = 2 * n * int transferIndex = 16 * int i = 0 * int bound = 0 * * 进入第一次最外层死循环流程: * 1 advance默认为true,因此默认进入while循环 * 2 那么先判断 --i>=bound 相当于 0-1 >= 0 || false 那么此判断必然不走 * 3 声明nextIndex = transferIndex = 16 <=0 很明显走不通 * 4 2和3判断走不通,则通过cas对主内存transferindex赋值nextBound = nextIndex[16] > stride[16] ? nextIndex[16] - stride[16] : 0 * 那么很明显此时nextBound = 0 也就以为这现在桶区间的为0了. 这意思就是说,原原有的桶总长度 - 一个线程处理任务数, 看看还有木有剩余的数据需要转移 * 那我们现在桶长度也就才16 一个线程就能转移16个数据,因此一个线程就能搞定了,不需要别的线程帮助. * 因此下一个线程处理任务区间为0, 那么i就是被指向下个具体要处理的数据下标, nextIndex是桶长度 - 1 = 15 所以从这来看,转移数据时,是倒着来处理; * 最后这个while循环就可以中断,并且设置advance = false 为了防止别的线程进入. * * 进入第二次最外层死循环流程: * 因为下标为15的=null,成功设置老tab的当前index为fwd节点后,设置advance=true,因此接着走: * 判断--i>=bound 相当于 15-1 >= 0 || false ,判断通过,开始处理这个区间里面的倒数第二个下标,并且设置advance = false 为了防止别的线程进入. * * 进入第三次最外层死循环流程: * 因为下标为14的=null,成功设置老tab的当前index为fwd节点后,设置advance=true,因此接着走: * 判断--i>=bound 相当于 14-1 >= 0 || false ,判断通过,开始处理这个区间里面的倒数第三个下标,并且设置advance = false 为了防止别的线程进入. * * 进入第四次最外层死循环流程: * 因为下标为13的=null,成功设置老tab的当前index为fwd节点后,设置advance=true,因此接着走: * 判断--i>=bound 相当于 13-1 >= 0 || false ,判断通过,开始处理这个区间里面的倒数第四个下标,并且设置advance = false 为了防止别的线程进入. * * * 进入第五次最外层死循环流程: * 下标为12的节点已经将数据转移成功,并设置老tab的当前index为fwd节点后,设置advance=true,因此接着走: * 判断--i>=bound 相当于 12-1 >= 0 || false ,判断通过,开始处理这个区间里面的倒数第五个下标,并且设置advance = false 为了防止别的线程进入. * * 进入第6\7\8\9\10\....次最外层死循环流程: * 下标为11\10\9\8\7\6\...\0的节点已经将数据转移成功,并设置老tab的当前index为fwd节点后,设置advance=true,因此接着走: * 开始处理这个区间里面的倒数第1个下标,并且设置advance = false 为了防止别的线程进入. * * 5当下标=0 ,也成功后,在看逻辑: * 判断--i>=bound 相当于 0-1 >= 0 || false ,判断不通过 * 判断 nextIndex还有木有数据处理, 此时就通过. 在第一次进入循环时通过cas已经设置transferIndex 为 nextBound . 那么也就0 ,因为在第一步时,已经明确是没有下一个区间范围数据了. * 设置i为-1 , advance = false 为了防止别的线程进入. */ if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } /** * 第一次最外层死循环流程: * 此时i = 15 15 < 0 || 15 >= 16 || 15 + 16 >= 32 明显不满足,跳过 * * 第二次最外层死循环流程: * 此时i = 14 14 < 0 || 14 >= 16 || 14 + 16 >= 32 明显不满足,跳过 * * 第三次最外层死循环流程: * 此时i = 13 13 < 0 || 13 >= 16 || 13 + 16 >= 32 明显不满足,跳过 * * 第四次最外层死循环流程: * 此时i = 12 12 < 0 || 12 >= 16 || 12 + 16 >= 32 明显不满足,跳过 * * 当第N次外层循环后: * 此时i = -1 -1 < 0 || -1 >= 16 || -1 + 16 >= 32 满足了第一条件, 这个我也是反一脑子问号, 第一个判断是知道最后这一段扩容结束了,其他的俩个判断咱愚钝还不明白,有大佬知道,请指教. * 有一个finishing 是为啥呢? 因为扩容可能是多个线程一起在做,得等最后一个扩容结束的线程,进入,那咋知道当前线程是不是最后一个扩容结束呢? 看下面个判断: * 是不是懵逼? 不怕不怕. 看代码详细注释 * */ if (i < 0 || i >= n || i + n >= nextn) { int sc; //控制map的标识 //如果显示整体扩容结束,就进入 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } /** * 懵逼是因为sizeCtl需要结合触发扩容前的方法,找答案. * 其实也就是addCount()方法中,当发现达到阀值后, 对原长度[n还是用16为例子] 做了resizeStamp(n) = 32795 * 在通过U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2) 方式将sizeCtl在rs的基础上左移16位 得到 -2145714176 在+2 得到最终-2145714174 * * 好,知道主内存sizeCtl的值咋来后,在看这compareAndSwapInt方法: * SIZECTL 其实是 valueOffset value的相对地址值 * sc = sizeCtl 这个是期望值 也就是-2145714174 * sc - 1 = 是置换值 -2145714174 -1 = -2145714175 * 那么此时主内存sizeCtl = -2145714175 但sc = -2145714174 * 在走判断 (sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT = -2145714174 - 2 = -2145714176 看上面在扩容前 已经做了 resizeStamp(n) << RESIZE_STAMP_SHIFT + 2的操作, * 因此如果全部扩容结束sc - 2 正好就等于原长度低位左移16的值. 也就标识扩容完成, 反则挑出次方法 * */ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; //如果扩容结束,那就把advance = true 并且设置finishing=true已结束 i = n; // 但是嘞, 以防万一有漏掉的数据没有迁移出去,在循环检查一遍, 正常情况,原tab的数据都转移后,都被设置了fwd节点,so都只会走"else if ((fh = f.hash) == MOVED) { // 当前节点hash为-1,说明已经被处理过了,调出本次循环"这个判断了 } } /** * 第一次最外层死循环流程: * 此时i = 15 通过cas方式获得tab在i位置上的Node节点, =null的话,说明没值嘛. * 我们预计要插入13个数据,那第15个位置的数据很明显为null, * 那么此时要在老表的这个位置标识一个占位符也就是: 带有扩容好2倍newTab容器的ForwardingNode节点; * fwd放置成功,那么设置advance = ture,接着处理下一个数据,也就是第下标为14的数据. * 最后跳出第一次最外层死循环 * * 第二次最外层死循环流程: * 此时i = 14 通过cas方式获得tab在i位置上的Node节点=null的,在下标为14的位置放置带有扩容好2倍newTab容器的ForwardingNode节点; * 放置成功,那么设置advance = ture,接着处理下一个数据,也就是第下标为13的数据. * 最后跳出第二次最外层死循环 * * 第三次最外层死循环流程: * 此时i = 13 通过cas方式获得tab在i位置上的Node节点=null的,在下标为13的位置放置带有扩容好2倍newTab容器的ForwardingNode节点; * 思考下:我们插入13条数据,为啥此时没数据? ------> 因为总量为16时,扩容阀值是12,因此当下标为12的数据放进后,走到put方法addCount时,检测到已达到阀值所以就已经进入扩容了. 并不是在插第13条数据时,才触发扩容滴. * 放置成功,那么设置advance = ture,接着处理下一个数据,也就是第下标为12的数据. * 最后跳出第三次最外层死循环 * * 第四次最外层死循环流程: * 此时下标为12的tab是有数据的,跳过 */ else if ((f = tabAt(tab, i)) == null) { advance = casTabAt(tab, i, null, fwd); } else if ((fh = f.hash) == MOVED) { // 当前节点hash为-1,说明已经被处理过了,调出本次循环 advance = true; // already processed } /** * 第四次最外层死循环流程: * 此时下标为12的tab是有数据的,则锁住当前节点,因为方式此时putVal 的时候向链表插入数据. * 具体详解逐行看下面代码. * 将下标为12的数据转移完成后,设置advance = ture,接着处理下一个数据,也就是第下标为11的数据. * 最后跳出第四次最外层死循环 * * 第五次最外层死循环流程: * 走上述同样逻辑; * 将下标为11的数据转移完成后,设置advance = ture,接着处理下一个数据,也就是第下标为10的数据. * 最后跳出第五次最外层死循环 * * 进入第6\7\8\9\10\....次最外层死循环流程: * 处理成功后,跳出第N次最外层死循环 */ else { synchronized (f) {//锁节点 if (tabAt(tab, i) == f) { //在此判断当前i节点 是否与前面f相等 Node<K,V> ln, //low 低位桶 hn; //height 高位桶 //f 的 hash 值大于 0 说明可能是单节点,或者是链表。 hash值是-2表示这时一个TreeBin节点 if (fh >= 0) { /** * 划重点阶段: * 用当前节点的hash值 与 原桶长度[16]做 与运算 ,因为桶的长度是2次幂[类似0000 1000],所以在这里取与的话,也只有2种情况: * 取决于fh的第n位于原桶长度的第n位如果都是1,那么结果的第n为也为1,否则为0 * 如果是0 则放 低位桶 是1则放 高位桶; * 那么你就会发现一个规则, 原当前节点的长度会被缩小, 位于低位通的节点还在原位置, 位于高位桶的数据被转移出来了,新的位置是当前节点位置 + 扩容的原长度 * 先理解下,一会供上图; */ int runBit = fh & n; Node<K,V> lastRun = f;//某节点, 为啥说是某节点呢? 如果是链表 则就是最后一个节点开始转移,称尾节点, 如是不是链表,就单节点,则按上述规则更新在新桶的位置 //查看f节点next是否为空,不为空, 则循环到最后一个位置,做与运算,按照0与1,找高低位桶 for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; //链表尾结点 与 原桶总长度 做与运算, 得到 0 或 1 /** * 这块是想说明啥问题呢? * 列如,不用关注 取余的tab的index是否是一致. * 现有一链表, 第一个数据 hash 为1 * 第2个数据 hash 为2 * 第3个数据 hash 为3 * 第4个数据 hash 为4 * 第5个数据 hash 为17 * 第6个数据 hash 为18 * 第7个数据 hash 为19 * n = 原长度 16 * 与运算 : * 首节点[其实就是Node<K,V> lastRun = f; 在for前面的] 结果为0 * 第2个数据 结果为0 * 第3个数据 结果为0 * 第4个数据 结果为0 * 第5个数据 结果为1 * * * 由上面看出来一个情况, 那就是1-4其实都在低位桶, 只有5才会去高位桶. 那么Doug Lea的设计会做浪费的循环么? 答案是No,继续往下走,找答案,看看是如何做到 避免不必要循环. * */ //如果该节点 与 上一个节点的取与 结果不同,则更新runBit 与 lastRun 更新成当前节点,其实也就是一个分界线 if (b != runBit) { runBit = b; lastRun = p;// 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环 } } /** * 接着上面的例子: * 第5个数据 与结果是 1,因此 这时 runBit = 1 lastRun = hash为17的节点 * * 那么走else逻辑, 将hash为17的放置高位桶 */ //0 挪至低位桶 if (runBit == 0) { ln = lastRun; hn = null; } else {//1 挪至高位桶. hn = lastRun; ln = null; } /** * 在此循环原来链表 * p = f = hash为1 与 lastRun[hash为17] 不相等循环进入 * 很明显,hash= 1\2\3\4才会进入循环, 并且都位于低位桶 这样一来 ln的这部分数据转移成功 * * 那么剩下下来的17 就单独移到高位桶了. 你可以在多想一步, 如果17后面 还有18 19 20 呢, 是不是阔以就不用循环了 * * 其实呢 链表的取与的情况还有其他中,大家可以一一代入走一遍. */ for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //将前一个循环的低位桶链,放置到当前下标的位置,其实没有变嘛. setTabAt(nextTab, i, ln); //将高位桶的链,在当前下标的基础在加上原有tab长度,就是高位桶的在新tab的新index了. setTabAt(nextTab, i + n, hn); //将老tab的当前下标替换成fwd,告诉其他线程,这个数据咱们已经处理了. setTabAt(tab, i, fwd); advance = true; } /** * 注意哦,第4层最外层的循环 还能走是红黑树结构哈. 当前下标还是12 * 但器原理与上述链表一致,就是数据结构不同,逻辑不同而已 */ else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); //去hash 与 原长度16做与运算 . 0放置低位桶 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } //1放置高位桶 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //与链表不同的是, 转移数据后,原来数的数量就会减少了, 那么依旧要满足之前的规则, 数长度小于6的话,就得转链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; //这块逻辑与链表一致, 低位桶 在新tab的index 不变, 高位桶 在新tab的index 在原基础上+原长度 setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
整体步骤分布4大步:
1、计算每个线程需要处理桶数,并将在原tab表上扩大2倍;
2、利用死循环,开始倒着遍历桶中的各种数据结构,并保证每个线程处理数据的独立
3、如果是链表或者数,那么同步转移数据,并且把数据分成高低位段,防止新的tab中
4、判断扩容结束,并重新检查是否有遗漏数据
其中可以发现,除了在针对单节点使用同步锁之外,其他均采用无锁方式,确保各个线程之间对数据处理的安全和高效。而且也能想到在其他正在put数据的线程,不让闲着帮忙一起扩容的思路令人赞叹。。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。