赞
踩
JDK 8容器初始化
1.源码分析
在JDK 8中的ConcurrentHashMap一共有5个构造方法,这几个构造方法中都没有对内部的数组做初始化, 只是对一些变量的初始值做了处理,其中ConcurrentHashMap的数组初始化是在第一次添加元素时完成的。
- //没有维护任何变量的操作,如果调用该方法,数组长度默认是16
- public ConcurrentHashMap() {
- }
- //传递进来一个初始容量,ConcurrentHashMap会基于这个值计算一个比这个值大的2的幂次方数作为初始容量
- public ConcurrentHashMap(int initialCapacity) {
- if (initialCapacity < 0)
- throw new IllegalArgumentException();
- int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
- MAXIMUM_CAPACITY :
- tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
- this.sizeCtl = cap;
- }
注意:
当我们调用上面这个方法得到的初始容量,和HashMap以及JDK 7中的ConcurrentHashMap不同,即使我们传递的是一个2的幂次方数,该方法计算出来的初始容量依然是比该值大的2的幂次方数。
- //调用四个参数的构造
- public ConcurrentHashMap(int initialCapacity, float loadFactor) {
- this(initialCapacity, loadFactor, 1);
- }
- //计算一个大于或者等于给定的容量值,该值是2的幂次方数作为初始容量
- public ConcurrentHashMap(int initialCapacity,
- float loadFactor, int concurrencyLevel) {
- if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
- throw new IllegalArgumentException();
- if (initialCapacity < concurrencyLevel) // Use at least as many bins
- initialCapacity = concurrencyLevel; // as estimated threads
- long size = (long)(1.0 + (long)initialCapacity / loadFactor);
- int cap = (size >= (long)MAXIMUM_CAPACITY) ?
- MAXIMUM_CAPACITY : tableSizeFor((int)size);
- this.sizeCtl = cap;
- }
- //基于一个Map集合,构建一个ConcurrentHashMap
- //初始容量为16
- public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
- this.sizeCtl = DEFAULT_CAPACITY;
- putAll(m);
- }
2.sizeCtl含义解释
注意:这些构造方法中,都会涉及sizeCtl变量,它是在构造方法里面的作用非常重要。这里,我们先简单解释这个变量的不同取值的含义。在后续的源码分析过程中,我们会进一步阐述这些含义。
sizeCtl为0:代表数组未初始化, 且数组的初始容量为16;
sizeCtl为正数:如果数组未初始化,那么其记录的是数组的初始容量,如果数组已经初始化,那么其记录的是数组的扩容阈值;
sizeCtl为-1:表示数组正在进行初始化;
sizeCtl小于0:并且不是-1,表示数组正在扩容, -(1+n),表示此时有n个线程正在共同完成数组的扩容操作。
JDK 8添加安全
1.源码分析
1.1 添加元素的put/putVal方法
该方法的源码如下:
- public V put(K key, V value) {
- return putVal(key, value, false);
- }
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- //如果有空值或者空键,直接抛异常
- if (key == null || value == null) throw new NullPointerException();
- //基于key计算hash值,并进行一定的扰动
- int hash = spread(key.hashCode());
- //记录某个桶上元素的个数,如果超过8个,会转成红黑树
- int binCount = 0;
- for (Node<K,V>[] tab = table;;) {
- Node<K,V> f; int n, i, fh;
- //如果数组还未初始化,先对数组进行初始化
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- //如果hash计算得到的桶位置没有元素,利用cas将元素添加
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- //cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
- if (casTabAt(tab, i, null,
- new Node<K,V>(hash, key, value, null)))
- break; // no lock when adding to empty bin
- }
- //如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
- else if ((fh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- else {
- //hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
- V oldVal = null;
- //对当前桶进行加锁,保证线程安全,执行元素添加操作
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- //普通链表节点
- if (fh >= 0) {
- binCount = 1;
- for (Node<K,V> e = f;; ++binCount) {
- K ek;
- if (e.hash == hash &&
- ((ek = e.key) == key ||
- (ek != null && key.equals(ek)))) {
- oldVal = e.val;
- if (!onlyIfAbsent)
- e.val = value;
- break;
- }
- Node<K,V> pred = e;
- if ((e = e.next) == null) {
- pred.next = new Node<K,V>(hash, key,
- value, null);
- break;
- }
- }
- }
- //树节点,将元素添加到红黑树中
- else if (f instanceof TreeBin) {
- Node<K,V> p;
- binCount = 2;
- if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
- value)) != null) {
- oldVal = p.val;
- if (!onlyIfAbsent)
- p.val = value;
- }
- }
- }
- }
- if (binCount != 0) {
- //链表长度大于/等于8,将链表转成红黑树
- if (binCount >= TREEIFY_THRESHOLD)
- treeifyBin(tab, i);
- //如果是重复键,直接将旧值返回
- if (oldVal != null)
- return oldVal;
- break;
- }
- }
- }
- //添加的是新元素,维护集合长度,并判断是否要进行扩容操作
- addCount(1L, binCount);
- return null;
- }
通过以上源码,我们可以看到,当需要添加元素时,会针对当前元素所对应的桶位进行加锁操作。这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率。
1.2 数组初始化的initTable方法
该方法的源码如下:
- private final Node<K,V>[] initTable() {
- Node<K,V>[] tab; int sc;
- //cas+自旋,保证线程安全,对数组进行初始化操作
- while ((tab = table) == null || tab.length == 0) {
- //如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
- if ((sc = sizeCtl) < 0)
- Thread.yield(); // lost initialization race; just spin
- //cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
- else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
- try {
- if ((tab = table) == null || tab.length == 0) {
- //sizeCtl为0,取默认长度16,否则去sizeCtl的值
- int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
- @SuppressWarnings("unchecked")
- //基于初始长度,构建数组对象
- Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
- table = tab = nt;
- //计算扩容阈值,并赋值给sc
- sc = n - (n >>> 2);
- }
- } finally {
- //将扩容阈值,赋值给sizeCtl
- sizeCtl = sc;
- }
- break;
- }
- }
- return tab;
- }
2.图解
put方法的加锁原理如下图所示:
JDK 8扩容安全
1.源码分析
扩容过程的源码如下所示:
- private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
- int n = tab.length, stride;
- //如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
- if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
- stride = MIN_TRANSFER_STRIDE; // subdivide range
- //如果是扩容线程,此时新数组为null
- if (nextTab == null) { // initiating
- try {
- @SuppressWarnings("unchecked")
- //两倍扩容创建新数组
- Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
- nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTable = nextTab;
- //记录线程开始迁移的桶位,从后往前迁移
- transferIndex = n;
- }
- //记录新数组的末尾
- int nextn = nextTab.length;
- //已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
- ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
- boolean advance = true;
- boolean finishing = false; // to ensure sweep before committing nextTab
- for (int i = 0, bound = 0;;) {
- Node<K,V> f; int fh;
- while (advance) {
- int nextIndex, nextBound;
- //i记录当前正在迁移桶位的索引值
- //bound记录下一次任务迁移的开始桶位
-
- //--i >= bound 成立表示当前线程分配的迁移任务还没有完成
- if (--i >= bound || finishing)
- advance = false;
- //没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- //计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex > stride ?
- nextIndex - stride : 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
- //如果没有更多的需要迁移的桶位,就进入该if
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- //扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
- if (finishing) {
- nextTable = null;
- table = nextTab;
- sizeCtl = (n << 1) - (n >>> 1);
- return;
- }
- //扩容任务线程数减1
- if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
- //判断当前所有扩容任务线程是否都执行完成
- if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
- return;
- //所有扩容线程都执行完,标识结束
- finishing = advance = true;
- i = n; // recheck before commit
- }
- }
- //当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
- else if ((f = tabAt(tab, i)) == null)
- advance = casTabAt(tab, i, null, fwd);
- //当前节点已经被迁移
- else if ((fh = f.hash) == MOVED)
- advance = true; // already processed
- else {
- //当前节点需要迁移,加锁迁移,保证多线程安全
- //此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- Node<K,V> ln, hn;
- if (fh >= 0) {
- int runBit = fh & n;
- Node<K,V> lastRun = f;
- for (Node<K,V> p = f.next; p != null; p = p.next) {
- int b = p.hash & n;
- if (b != runBit) {
- runBit = b;
- lastRun = p;
- }
- }
- if (runBit == 0) {
- ln = lastRun;
- hn = null;
- }
- else {
- hn = lastRun;
- ln = null;
- }
- for (Node<K,V> p = f; p != lastRun; p = p.next) {
- int ph = p.hash; K pk = p.key; V pv = p.val;
- if ((ph & n) == 0)
- ln = new Node<K,V>(ph, pk, pv, ln);
- else
- hn = new Node<K,V>(ph, pk, pv, hn);
- }
- setTabAt(nextTab, i, ln);
- setTabAt(nextTab, i + n, hn);
- setTabAt(tab, i, fwd);
- advance = true;
- }
- else if (f instanceof TreeBin) {
- TreeBin<K,V> t = (TreeBin<K,V>)f;
- TreeNode<K,V> lo = null, loTail = null;
- TreeNode<K,V> hi = null, hiTail = null;
- int lc = 0, hc = 0;
- for (Node<K,V> e = t.first; e != null; e = e.next) {
- int h = e.hash;
- TreeNode<K,V> p = new TreeNode<K,V>
- (h, e.key, e.val, null, null);
- if ((h & n) == 0) {
- if ((p.prev = loTail) == null)
- lo = p;
- else
- loTail.next = p;
- loTail = p;
- ++lc;
- }
- else {
- if ((p.prev = hiTail) == null)
- hi = p;
- else
- hiTail.next = p;
- hiTail = p;
- ++hc;
- }
- }
- ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
- (hc != 0) ? new TreeBin<K,V>(lo) : t;
- hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
- (lc != 0) ? new TreeBin<K,V>(hi) : t;
- setTabAt(nextTab, i, ln);
- setTabAt(nextTab, i + n, hn);
- setTabAt(tab, i, fwd);
- advance = true;
- }
- }
- }
- }
- }
- }
2.图解
JDK 8多线程扩容效率改进
多线程协助扩容的操作会在两个地方被触发:
①. 当添加元素时,发现添加的元素对用的桶位为fwd节点,就会先去协助扩容,然后再添加元素;
②. 当添加完元素后,判断当前元素个数达到了扩容阈值,此时发现sizeCtl的值小于0,并且新数组不为空,这时会去协助扩容。
1.源码分析
1.1 元素未添加,先协助扩容,扩容完后再添加元素
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node<K,V>[] tab = table;;) {
- Node<K,V> f; int n, i, fh;
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- if (casTabAt(tab, i, null,
- new Node<K,V>(hash, key, value, null)))
- break; // no lock when adding to empty bin
- }
- //发现此处为fwd节点,协助扩容,扩容结束后,再循环回来添加元素
- else if ((fh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
-
- //省略代码
- final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
- Node<K,V>[] nextTab; int sc;
- if (tab != null && (f instanceof ForwardingNode) &&
- (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
- int rs = resizeStamp(tab.length);
- while (nextTab == nextTable && table == tab &&
- (sc = sizeCtl) < 0) {
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
- sc == rs + MAX_RESIZERS || transferIndex <= 0)
- break;
- if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
- //扩容,传递一个不是null的nextTab
- transfer(tab, nextTab);
- break;
- }
- }
- return nextTab;
- }
- return table;
- }
1.2 先添加元素,再协助扩容
- private final void addCount(long x, int check) {
- //省略代码
-
- if (check >= 0) {
- Node<K,V>[] tab, nt; int n, sc;
- //元素个数达到扩容阈值
- while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
- (n = tab.length) < MAXIMUM_CAPACITY) {
- int rs = resizeStamp(n);
- //sizeCtl小于0,说明正在执行扩容,那么协助扩容
- if (sc < 0) {
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
- sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
- transferIndex <= 0)
- break;
- if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
- transfer(tab, nt);
- }
- else if (U.compareAndSwapInt(this, SIZECTL, sc,
- (rs << RESIZE_STAMP_SHIFT) + 2))
- transfer(tab, null);
- s = sumCount();
- }
- }
- }
注意:扩容的代码都在transfer方法中,这里不再赘述
2.图解
扩容过程原理如下图所示:
以上就是JDK 8中ConcurrentHashMap的初始化及扩容原理,现在你明白了吗?
添加下面学姐微信干货天天都不断。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。