当前位置:   article > 正文

还不懂 ConcurrentHashMap ?这份源码分析了解一下_concurrenthashmap源码分析

concurrenthashmap源码分析

 JDK 8容器初始化

1.源码分析

在JDK 8中的ConcurrentHashMap一共有5个构造方法,这几个构造方法中都没有对内部的数组做初始化, 只是对一些变量的初始值做了处理,其中ConcurrentHashMap的数组初始化是在第一次添加元素时完成的。

  1. //没有维护任何变量的操作,如果调用该方法,数组长度默认是16
  2. public ConcurrentHashMap() {
  3. }
  1. //传递进来一个初始容量,ConcurrentHashMap会基于这个值计算一个比这个值大的2的幂次方数作为初始容量
  2. public ConcurrentHashMap(int initialCapacity) {
  3.     if (initialCapacity < 0)
  4.         throw new IllegalArgumentException();
  5.     int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
  6.                MAXIMUM_CAPACITY :
  7.                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
  8.     this.sizeCtl = cap;
  9. }

注意:

当我们调用上面这个方法得到的初始容量,和HashMap以及JDK 7中的ConcurrentHashMap不同,即使我们传递的是一个2的幂次方数,该方法计算出来的初始容量依然是比该值大的2的幂次方数。

  1. //调用四个参数的构造
  2. public ConcurrentHashMap(int initialCapacity, float loadFactor) {
  3.     this(initialCapacity, loadFactor, 1);
  4. }
  1. //计算一个大于或者等于给定的容量值,该值是2的幂次方数作为初始容量
  2. public ConcurrentHashMap(int initialCapacity,
  3.                          float loadFactor, int concurrencyLevel) {
  4.     if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
  5.         throw new IllegalArgumentException();
  6.     if (initialCapacity < concurrencyLevel)   // Use at least as many bins
  7.         initialCapacity = concurrencyLevel;   // as estimated threads
  8.     long size = (long)(1.0 + (long)initialCapacity / loadFactor);
  9.     int cap = (size >= (long)MAXIMUM_CAPACITY) ?
  10.         MAXIMUM_CAPACITY : tableSizeFor((int)size);
  11.     this.sizeCtl = cap;
  12. }
  1. //基于一个Map集合,构建一个ConcurrentHashMap
  2. //初始容量为16
  3. public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
  4.     this.sizeCtl = DEFAULT_CAPACITY;
  5.     putAll(m);
  6. }

2.sizeCtl含义解释

注意:这些构造方法中,都会涉及sizeCtl变量,它是在构造方法里面的作用非常重要。这里,我们先简单解释这个变量的不同取值的含义。在后续的源码分析过程中,我们会进一步阐述这些含义。

  • sizeCtl为0:代表数组未初始化, 且数组的初始容量为16;

  • sizeCtl为正数:如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值;

  • sizeCtl为-1:表示数组正在进行初始化;

  • sizeCtl小于0:并且不是-1,表示数组正在扩容, -(1+n),表示此时有n个线程正在共同完成数组的扩容操作。

 JDK 8添加安全

1.源码分析

1.1 添加元素的put/putVal方法

该方法的源码如下:

  1. public V put(K key, V value) {
  2.     return putVal(key, value, false);
  3. }
  1. final V putVal(K key, V valueboolean onlyIfAbsent) {
  2.     //如果有空值或者空键,直接抛异常
  3.     if (key == null || value == null) throw new NullPointerException();
  4.     //基于key计算hash值,并进行一定的扰动
  5.     int hash = spread(key.hashCode());
  6.     //记录某个桶上元素的个数,如果超过8个,会转成红黑树
  7.     int binCount = 0;
  8.     for (Node<K,V>[] tab = table;;) {
  9.         Node<K,V> f; int n, i, fh;
  10.         //如果数组还未初始化,先对数组进行初始化
  11.         if (tab == null || (n = tab.length== 0)
  12.             tab = initTable();
  13.             //如果hash计算得到的桶位置没有元素,利用cas将元素添加
  14.         else if ((f = tabAt(tab, i = (n - 1& hash)) == null) {
  15.             //cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
  16.             if (casTabAt(tab, i, null,
  17.                          new Node<K,V>(hash, keyvaluenull)))
  18.                 break;                   // no lock when adding to empty bin
  19.         }
  20.         //如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
  21.         else if ((fh = f.hash) == MOVED)
  22.             tab = helpTransfer(tab, f);
  23.         else {
  24.             //hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
  25.             V oldVal = null;
  26.             //对当前桶进行加锁,保证线程安全,执行元素添加操作
  27.             synchronized (f) {
  28.                 if (tabAt(tab, i) == f) {
  29.                     //普通链表节点
  30.                     if (fh >= 0) {
  31.                         binCount = 1;
  32.                         for (Node<K,V> e = f;; ++binCount) {
  33.                             K ek;
  34.                             if (e.hash == hash &&
  35.                                 ((ek = e.key== key ||
  36.                                  (ek != null && key.equals(ek)))) {
  37.                                 oldVal = e.val;
  38.                                 if (!onlyIfAbsent)
  39.                                     e.val = value;
  40.                                 break;
  41.                             }
  42.                             Node<K,V> pred = e;
  43.                             if ((e = e.next== null) {
  44.                                 pred.next = new Node<K,V>(hash, key,
  45.                                                           valuenull);
  46.                                 break;
  47.                             }
  48.                         }
  49.                     }
  50.                     //树节点,将元素添加到红黑树中
  51.                     else if (f instanceof TreeBin) {
  52.                         Node<K,V> p;
  53.                         binCount = 2;
  54.                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  55.                                                        value)) != null) {
  56.                             oldVal = p.val;
  57.                             if (!onlyIfAbsent)
  58.                                 p.val = value;
  59.                         }
  60.                     }
  61.                 }
  62.             }
  63.             if (binCount != 0) {
  64.                 //链表长度大于/等于8,将链表转成红黑树
  65.                 if (binCount >= TREEIFY_THRESHOLD)
  66.                     treeifyBin(tab, i);
  67.                 //如果是重复键,直接将旧值返回
  68.                 if (oldVal != null)
  69.                     return oldVal;
  70.                 break;
  71.             }
  72.         }
  73.     }
  74.     //添加的是新元素,维护集合长度,并判断是否要进行扩容操作
  75.     addCount(1L, binCount);
  76.     return null;
  77. }

通过以上源码,我们可以看到,当需要添加元素时,会针对当前元素所对应的桶位进行加锁操作。这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率。

1.2 数组初始化的initTable方法

该方法的源码如下:

  1. private final Node<K,V>[] initTable() {
  2.     Node<K,V>[] tab; int sc;
  3.     //cas+自旋,保证线程安全,对数组进行初始化操作
  4.     while ((tab = table== null || tab.length == 0) {
  5.         //如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
  6.         if ((sc = sizeCtl) < 0)
  7.             Thread.yield(); // lost initialization race; just spin
  8.         //cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
  9.         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
  10.             try {
  11.                 if ((tab = table== null || tab.length == 0) {
  12.                     //sizeCtl为0,取默认长度16,否则去sizeCtl的值
  13.                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  14.                     @SuppressWarnings("unchecked")
  15.                     //基于初始长度,构建数组对象
  16.                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  17.                     table = tab = nt;
  18.                     //计算扩容阈值,并赋值给sc
  19.                     sc = n - (n >>> 2);
  20.                 }
  21.             } finally {
  22.                 //将扩容阈值,赋值给sizeCtl
  23.                 sizeCtl = sc;
  24.             }
  25.             break;
  26.         }
  27.     }
  28.     return tab;
  29. }

2.图解

put方法的加锁原理如下图所示:

 JDK 8扩容安全

1.源码分析

扩容过程的源码如下所示:

  1. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  2.     int n = tab.length, stride;
  3.     //如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
  4.     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  5.         stride = MIN_TRANSFER_STRIDE; // subdivide range
  6.     //如果是扩容线程,此时新数组为null
  7.     if (nextTab == null) {            // initiating
  8.         try {
  9.             @SuppressWarnings("unchecked")
  10.             //两倍扩容创建新数组
  11.             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
  12.             nextTab = nt;
  13.         } catch (Throwable ex) {      // try to cope with OOME
  14.             sizeCtl = Integer.MAX_VALUE;
  15.             return;
  16.         }
  17.         nextTable = nextTab;
  18.         //记录线程开始迁移的桶位,从后往前迁移
  19.         transferIndex = n;
  20.     }
  21.     //记录新数组的末尾
  22.     int nextn = nextTab.length;
  23.     //已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
  24.     ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  25.     boolean advance = true;
  26.     boolean finishing = false// to ensure sweep before committing nextTab
  27.     for (int i = 0, bound = 0;;) {
  28.         Node<K,V> f; int fh;
  29.         while (advance) {
  30.             int nextIndex, nextBound;
  31.             //i记录当前正在迁移桶位的索引值
  32.             //bound记录下一次任务迁移的开始桶位
  33.             
  34.             //--i >= bound 成立表示当前线程分配的迁移任务还没有完成
  35.             if (--i >= bound || finishing)
  36.                 advance = false;
  37.             //没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
  38.             else if ((nextIndex = transferIndex) <= 0) {
  39.                 i = -1;
  40.                 advance = false;
  41.             }
  42.             //计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
  43.             else if (U.compareAndSwapInt
  44.                      (this, TRANSFERINDEX, nextIndex,
  45.                       nextBound = (nextIndex > stride ?
  46.                                    nextIndex - stride : 0))) {
  47.                 bound = nextBound;
  48.                 i = nextIndex - 1;
  49.                 advance = false;
  50.             }
  51.         }
  52.         //如果没有更多的需要迁移的桶位,就进入该if
  53.         if (i < 0 || i >= n || i + n >= nextn) {
  54.             int sc;
  55.             //扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
  56.             if (finishing) {
  57.                 nextTable = null;
  58.                 table = nextTab;
  59.                 sizeCtl = (n << 1) - (n >>> 1);
  60.                 return;
  61.             }
  62.                    //扩容任务线程数减1
  63.             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  64.                 //判断当前所有扩容任务线程是否都执行完成
  65.                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  66.                     return;
  67.                 //所有扩容线程都执行完,标识结束
  68.                 finishing = advance = true;
  69.                 i = n; // recheck before commit
  70.             }
  71.         }
  72.         //当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
  73.         else if ((f = tabAt(tab, i)) == null)
  74.             advance = casTabAt(tab, i, null, fwd);
  75.         //当前节点已经被迁移
  76.         else if ((fh = f.hash) == MOVED)
  77.             advance = true// already processed
  78.         else {
  79.             //当前节点需要迁移,加锁迁移,保证多线程安全
  80.             //此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
  81.             synchronized (f) {
  82.                 if (tabAt(tab, i) == f) {
  83.                     Node<K,V> ln, hn;
  84.                     if (fh >= 0) {
  85.                         int runBit = fh & n;
  86.                         Node<K,V> lastRun = f;
  87.                         for (Node<K,V> p = f.next; p != null; p = p.next) {
  88.                             int b = p.hash & n;
  89.                             if (b != runBit) {
  90.                                 runBit = b;
  91.                                 lastRun = p;
  92.                             }
  93.                         }
  94.                         if (runBit == 0) {
  95.                             ln = lastRun;
  96.                             hn = null;
  97.                         }
  98.                         else {
  99.                             hn = lastRun;
  100.                             ln = null;
  101.                         }
  102.                         for (Node<K,V> p = f; p != lastRun; p = p.next) {
  103.                             int ph = p.hash; K pk = p.key; V pv = p.val;
  104.                             if ((ph & n) == 0)
  105.                                 ln = new Node<K,V>(ph, pk, pv, ln);
  106.                             else
  107.                                 hn = new Node<K,V>(ph, pk, pv, hn);
  108.                         }
  109.                         setTabAt(nextTab, i, ln);
  110.                         setTabAt(nextTab, i + n, hn);
  111.                         setTabAt(tab, i, fwd);
  112.                         advance = true;
  113.                     }
  114.                     else if (f instanceof TreeBin) {
  115.                         TreeBin<K,V> t = (TreeBin<K,V>)f;
  116.                         TreeNode<K,V> lo = null, loTail = null;
  117.                         TreeNode<K,V> hi = null, hiTail = null;
  118.                         int lc = 0, hc = 0;
  119.                         for (Node<K,V> e = t.first; e != null; e = e.next) {
  120.                             int h = e.hash;
  121.                             TreeNode<K,V> p = new TreeNode<K,V>
  122.                                 (h, e.key, e.val, nullnull);
  123.                             if ((h & n) == 0) {
  124.                                 if ((p.prev = loTail) == null)
  125.                                     lo = p;
  126.                                 else
  127.                                     loTail.next = p;
  128.                                 loTail = p;
  129.                                 ++lc;
  130.                             }
  131.                             else {
  132.                                 if ((p.prev = hiTail) == null)
  133.                                     hi = p;
  134.                                 else
  135.                                     hiTail.next = p;
  136.                                 hiTail = p;
  137.                                 ++hc;
  138.                             }
  139.                         }
  140.                         ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  141.                             (hc != 0) ? new TreeBin<K,V>(lo) : t;
  142.                         hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  143.                             (lc != 0) ? new TreeBin<K,V>(hi) : t;
  144.                         setTabAt(nextTab, i, ln);
  145.                         setTabAt(nextTab, i + n, hn);
  146.                         setTabAt(tab, i, fwd);
  147.                         advance = true;
  148.                     }
  149.                 }
  150.             }
  151.         }
  152.     }
  153. }

2.图解

 JDK 8多线程扩容效率改进

多线程协助扩容的操作会在两个地方被触发:

①. 当添加元素时,发现添加的元素对用的桶位为fwd节点,就会先去协助扩容,然后再添加元素;

②. 当添加完元素后,判断当前元素个数达到了扩容阈值,此时发现sizeCtl的值小于0,并且新数组不为空,这时会去协助扩容。

1.源码分析

1.1 元素未添加,先协助扩容,扩容完后再添加元素

  1. final V putVal(K key, V valueboolean onlyIfAbsent) {
  2.     if (key == null || value == null) throw new NullPointerException();
  3.     int hash = spread(key.hashCode());
  4.     int binCount = 0;
  5.     for (Node<K,V>[] tab = table;;) {
  6.         Node<K,V> f; int n, i, fh;
  7.         if (tab == null || (n = tab.length== 0)
  8.             tab = initTable();
  9.         else if ((f = tabAt(tab, i = (n - 1& hash)) == null) {
  10.             if (casTabAt(tab, i, null,
  11.                          new Node<K,V>(hash, keyvaluenull)))
  12.                 break;                   // no lock when adding to empty bin
  13.         }
  14.         //发现此处为fwd节点,协助扩容,扩容结束后,再循环回来添加元素
  15.         else if ((fh = f.hash) == MOVED)
  16.             tab = helpTransfer(tab, f);
  17.         
  18.         //省略代码
  1. final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
  2.     Node<K,V>[] nextTab; int sc;
  3.     if (tab != null && (f instanceof ForwardingNode) &&
  4.         (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
  5.         int rs = resizeStamp(tab.length);
  6.         while (nextTab == nextTable && table == tab &&
  7.                (sc = sizeCtl) < 0) {
  8.             if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  9.                 sc == rs + MAX_RESIZERS || transferIndex <= 0)
  10.                 break;
  11.             if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
  12.                 //扩容,传递一个不是null的nextTab
  13.                 transfer(tab, nextTab);
  14.                 break;
  15.             }
  16.         }
  17.         return nextTab;
  18.     }
  19.     return table;
  20. }

1.2 先添加元素,再协助扩容

  1. private final void addCount(long x, int check) {
  2.     //省略代码
  3.     
  4.     if (check >= 0) {
  5.         Node<K,V>[] tab, nt; int n, sc;
  6.               //元素个数达到扩容阈值
  7.         while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
  8.                (n = tab.length< MAXIMUM_CAPACITY) {
  9.             int rs = resizeStamp(n);
  10.             //sizeCtl小于0,说明正在执行扩容,那么协助扩容
  11.             if (sc < 0) {
  12.                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  13.                     sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  14.                     transferIndex <= 0)
  15.                     break;
  16.                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  17.                     transfer(tab, nt);
  18.             }
  19.             else if (U.compareAndSwapInt(this, SIZECTL, sc,
  20.                                          (rs << RESIZE_STAMP_SHIFT) + 2))
  21.                 transfer(tab, null);
  22.             s = sumCount();
  23.         }
  24.     }
  25. }

注意:扩容的代码都在transfer方法中,这里不再赘述

2.图解

扩容过程原理如下图所示:

以上就是JDK 8中ConcurrentHashMap的初始化及扩容原理,现在你明白了吗?

添加下面学姐微信干货天天都不断。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/339685
推荐阅读
相关标签
  

闽ICP备14008679号