赞
踩
HashMap是基于Map接口实现的,元素以key-value的方式存储在map在中,此实现提供所有可选的映射操作,并允许null的key和null的value。 HashMap类与Hashtable类大致等效,不同之处在于它不是线程安全的,并且允许为null。与1.7的主要区别在于数据结构上的不同,后续小节会详细介绍。HashMap的UML图如下图所示:
HashMap一共有4个构造方法,如下图所示:
/**
* 构造一个空的HashMap,默认初始容量为16,默认负载因子为0.75
*/
public HashMap() {
this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
}
/**
* 构造一个空的HashMap,指定初始容量为initialCapacity,默认负载因子为0.75
*/
public HashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR);
}
/** * 构造一个空的HashMap,指定初始容量和负载因子 */ public HashMap(int initialCapacity, float loadFactor) { // 判断传入的初始容量是否大于0,如果小于0,则抛出相应异常信息。 if (initialCapacity < 0) throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity); // 判断传入的初始容量是否大于最大容量,如果大于最大容量,则将initialCapacity赋值为最大容量MAXIMUM_CAPACITY,1 << 30,1左移30位 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // 如果负载因子loadFactor小于0 或者 如果指定的数字不是一个数字(NaN),返回{true,否则返回false if (loadFactor <= 0 || Float.isNaN(loadFactor)) throw new IllegalArgumentException("Illegal load factor: " + loadFactor); this.loadFactor = loadFactor; this.threshold = tableSizeFor(initialCapacity); }
/**
* 构造一个新的HashMap,Map集合作为参数。
*/
public HashMap(Map<? extends K, ? extends V> m) {
this.loadFactor = DEFAULT_LOAD_FACTOR;
putMapEntries(m, false);
}
常量:
成员变量:
JDK1.8中的HashMap与JDK1.7中的HashMap在数据结构上有较大的区别,1.7的数据结构由数组+单向链表数组,而1.8的数据结构由数组+链表+红黑树组成。在JDK1.8中使用一个静态内部类Node来代表HashMap的数据节点,这个跟JDK1.7中的Entry没啥区别,其中重要属性有hash,key,value,next。如下图所示:
Node只用于链表的情况,而红黑树的情况需要使用一个静态内部类TreeNode来代表HashMap的数据节点,其中重要属性有parent,left,right,pre,red。如下图所示:
根据JDK1.7 HashMap的介绍,在查找一个数据元素的时候,根据hash值能够快速的找到数组的具体索引位置,但是之后需要顺着链表一个一个的去比较找到对应的元素,时间复杂度取决于链表的长度,这种方式的时间复杂度为O(n)。
而JDK1.8中的HashMap为了降低这种查找数据元素带来的开销,在put元素的时候,当链表元素个数达到8个时,会将链表转换为红黑树。当在红黑树中进行数据查找的时候,可以降低一定的时间复杂度,这种方式的时间复杂度为O(logN)。
JDK1.8数据结构如下图所示:
我们都知道HashMap的底层是一个基于 Node<K,V>[] table 的数组来实现的,看完了上面的构造函数及其数据结构,我们发现数组并不是在构造函数中完成的,那是在哪里初始化的呢?带着这个疑问我们来看一下HashMap中的 put 方法。
/**
* 将map中的指定值与指定键关联。如果map中存在相同的key,key的旧值将被替换。
*/
public V put(K key, V value) {
return putVal(hash(key), key, value, false, true);
}
在此方法中就会去判断,如果是第一次put元素的话就会去初始化table数组,调用resize()方法进行数据的初始化,当然resize()不仅仅只是用于初始数组table,还进行后续的扩容处理,后续会详细介绍resize()方法。
/** * * @param hash key的hash值 * @param key 插入元素的key * @param value插入元素的value * @param onlyIfAbsent 如果onlyIfAbsent为true,在put的时候不替旧值 * @param evict 如果evict为false,表示这个table处于创建模式 * @return previous value, or null if none */ final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { // tab表示table数组,p表示数组下标i处的元素,n表示table数组的长度,i表示table数组中的下标 Node<K,V>[] tab; Node<K,V> p; int n, i; // 如果table数组为空或者长度0,表示第一次put元素,则初始化table,会触发下面的 resize(),类似JDK1.7的第一次put也要初始化数组长度 // 第一次resize和后续的扩容有些不一样,因为这次是数组从null初始化到默认的16或自定义的初始容量 if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; if ((p = tab[i = (n - 1) & hash]) == null) // 如果table数组中下标i位置处的元素为null,则新创建一个Node放在数组中下标i位置(无哈希碰撞) tab[i] = newNode(hash, key, value, null); else {// table数组i下标位置处有值(有哈希碰撞) Node<K,V> e; K k; // 判断i位置处的第一个元素数据和要put的元素数据,key是不是相等,如果是相等,将p赋值给e if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; else if (p instanceof TreeNode) // 判断i位置处的元素数据节点是否是红黑树的节点,如果是,调用红黑树的插值方法 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { // 数组i位置处的元素数据为链表结构 for (int binCount = 0; ; ++binCount) { // i位置处链表节点的下一个节点为null if ((e = p.next) == null) { // 创建一个新的Node节点,并赋值给p的下一个节点(尾插法) p.next = newNode(hash, key, value, null); // TREEIFY_THRESHOLD 为 8,如果新插入的值是链表中的第8个元素,调用treeifyBin方法,将链表转换为红黑树 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st treeifyBin(tab, hash); break; } // 在i位置处的链表中找到了相等的key if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; // 继续循环遍历链表 p = e; } } // e!=null,说明在map中存在key与要插入的元素key相等 if (e != null) { // existing mapping for key V oldValue = e.value; // 根据方法中的参数onlyIfAbsent判断是否要覆盖旧值 if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); // 返回旧值 return oldValue; } } ++modCount; // 新插入值后,判断map中元素个数是否已经超过的阈值,超过则需要对map进行扩容操作 // 这点跟JDK1.7有点不一样,1.7中是先判断是否超过阈值,如果超过阈值,则扩容,再去插入新值 if (++size > threshold) resize(); afterNodeInsertion(evict); return null; }
该方法用于初始化数组或者对数组进行扩容,每次扩容大小都为原来数组大小的2倍,并进行数据转移到新的table中。
final Node<K,V>[] resize() { // old table数组元素数据 Node<K,V>[] oldTab = table; // 旧数组的长度 int oldCap = (oldTab == null) ? 0 : oldTab.length; // 旧阈值 int oldThr = threshold; int newCap, newThr = 0; if (oldCap > 0) { // oldCap > 0,说明不是第一次put元素时调用resize if (oldCap >= MAXIMUM_CAPACITY) { // oldCap大于默认的最大值,扩容的阈值赋值为Integer.MAX_VALUE,返回旧的容量值 threshold = Integer.MAX_VALUE; return oldTab; } // 数组长度扩容为原来的2倍 else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) // 阈值扩大一倍 newThr = oldThr << 1; // double threshold } // 当使用 new HashMap(int initialCapacity) 初始化后,第一次put元素的时候执行 else if (oldThr > 0) // initial capacity was placed in threshold newCap = oldThr; // 初始容量设置为阈值大小 // 当使用 new HashMap() 初始化后,第一次put元素的时候执行 else { // zero initial threshold signifies using defaults newCap = DEFAULT_INITIAL_CAPACITY; // 默认初始容量16 newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); // 计算扩容的阈值 } // 阈值为 0 处理(哈希表还没有初始化但 threshold 已经被初始化) if (newThr == 0) { // 计算扩容阈值 float ft = (float)newCap * loadFactor; newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE); } threshold = newThr; @SuppressWarnings({"rawtypes","unchecked"}) Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; // 使用新的newCap容量创建一个新的数组 table = newTab; if (oldTab != null) { // 开始遍历原数组,进行数据迁移。 for (int j = 0; j < oldCap; ++j) { // 记录当前桶位置的头节点 Node<K,V> e; if ((e = oldTab[j]) != null) { // 置 null(链表或树),让 GC 回收 oldTab[j] = null; // 如果e.next == null,说明该数组数组位置上只有一个元素,那就简单了,简单迁移这个元素就可以了 if (e.next == null) newTab[e.hash & (newCap - 1)] = e; // 如果该数组位置是红黑树节点,就调用红黑树的处理方式进去数据迁移 else if (e instanceof TreeNode) ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); else { // preserve order // 代码执行到此,说明该数组位置处的元素是链表结构,且元素个数至少为2个 // 需要将此链表拆成两个链表,放到新的数组中,并且保留原来的先后顺序 // loHead、loTail 对应一条链表,hiHead、hiTail 对应另一条链表 Node<K,V> loHead = null, loTail = null; Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; // 判断旧数组位置上的所有元素位于低位还是高位 if ((e.hash & oldCap) == 0) { // 元素在低位 if (loTail == null) loHead = e; else loTail.next = e; loTail = e; } else { // 元素在高位 if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); // 第一条链表数据,还是放在原来的j位置,低位(0)位置保持不变直接 rehash if (loTail != null) { loTail.next = null; // 第一条链表 newTab[j] = loHead; } // 第二条链表数据,放在新的位置 j + oldCap 处,高位(1)位置需要加上旧数组的容量 if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } } } } } return newTab; }
当在put元素的时候,发现put的元素是链表第8个元素时,调用此方法将链表转换为红黑树。至于将链表转换为红黑树的逻辑本文将不会暂开来分析,点到为止,后续会单独写一篇文章来讲解红黑树,各位拭目以待吧。
final void treeifyBin(Node<K,V>[] tab, int hash) { int n, index; Node<K,V> e; // 当前tab容量为空或者tab数组大小小于64的情况下,不会进行红黑树转换,使用的resize()代替,resize会增加一倍的容量 if (tab == null || (n = tab.length) < MIN_TREEIFY_CAPACITY) resize(); else if ((e = tab[index = (n - 1) & hash]) != null) { TreeNode<K,V> hd = null, tl = null; do { // 构建节点 TreeNode<K,V> p = replacementTreeNode(e, null); if (tl == null) hd = p; else { p.prev = tl; tl.next = p; } tl = p; } while ((e = e.next) != null); // 树根节点赋值给数组 if ((tab[index] = hd) != null) // 构造红黑树 hd.treeify(tab); } }
根据指定的key返回对应的value,如果map中不包含指定的key,则返回null。
public V get(Object key) { Node<K,V> e; return (e = getNode(hash(key), key)) == null ? null : e.value; } // 返回key对应的Node信息 final Node<K,V> getNode(int hash, Object key) { Node<K,V>[] tab; Node<K,V> first, e; int n; K k; // tab不为null,且数组长度n大于0,且first不为null if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { // 判断第一个节点是否为要查找的数据节点,如果是,则返回数据节点first if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { // first节点为红黑树,调用getTreeNode方法获取数据信息 if (first instanceof TreeNode) return ((TreeNode<K,V>)first).getTreeNode(hash, key); do { // 循环遍历链表,在链表中查询是否有参数key对应的节点 if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }
在HashMap中如果要根据key删除这个key对应的键值对,需要调用remove(key)方法,该方法将会根据查找到匹配的键值对,将其从HashMap中删除,并且返回键值对的值。如果返回空,说明key可能不存在,也可能key对应的值就是null。如果想确定到底key是否存在可以使用containsKey方法判断是否存在。
/**
* 如果map中存在对应的key,则从map中删除指定key的数据节点信息。
*/
public V remove(Object key) {
Node<K,V> e;
return (e = removeNode(hash(key), key, null, false, true)) == null ?
null : e.value;
}
/** * 方法为final,不可被覆写,子类可以通过实现afterNodeRemoval方法来增加自己的处理逻辑 * * @param hash key的hash值,该值是通过hash(key)获取到的 * @param key 要删除的键值对的key * @param value 要删除的键值对的value,该值是否作为删除的条件取决于matchValue是否为true * @param matchValue 如果为true,则当key对应的键值对的值equals(value)为true时才删除;否则不关心value的值 * @param movable 删除后是否移动节点,如果为false,则不移动 * @return 返回被删除的节点对象,如果没有删除任何节点则返回null */ final Node<K,V> removeNode(int hash, Object key, Object value, boolean matchValue, boolean movable) { // 定义节点数组、当前节点、数组长度、索引值 Node<K,V>[] tab; Node<K,V> p; int n, index; // 如果tab数组不为null、数组长度n大于0、根据hash定位到的节点对象p(该节点为树的根节点或链表的首节点)不为null // 需要从该节点p向下遍历,找到那个和key匹配的节点对象 if ((tab = table) != null && (n = tab.length) > 0 && (p = tab[index = (n - 1) & hash]) != null) { // 定义要返回的节点对象node,一个临时节点变量e、键变量 k、值变量 v Node<K,V> node = null, e; K k; V v; // 如果当前节点p的key和需要删除的key相等,那么当前节点就是要删除的节点,赋值给node if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) node = p; //代码执行到此说明首节点没有匹配上要删除的key,那么检查下是否有next节点 //如果没有next节点,说明该节点所在位置上没有发生hash碰撞, 就一个节点并且还没匹配上,也就没得删了,最终也就返回null了 //如果存在next节点,说明该数组位置上发生了hash碰撞,此时可能存在一个链表,也可能是一颗红黑树 else if ((e = p.next) != null) { // 如果当前节点是一颗红黑树,则调用getTreeNode方法从树结构中查找满足条件的节点 if (p instanceof TreeNode) node = ((TreeNode<K,V>)p).getTreeNode(hash, key); // 如果不是红黑树节点,那么该节点就是一个链表结构,只需要从头到尾逐个遍历节点比对即可 else { do { // 如果节点e的key和要删除的key相等,e节点就是要删除的节点,赋值给node变量,调出循环 if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) { node = e; break; } // 走到这里,说明e也没有匹配上;把e节点赋值给当前节点p,这一步是让p存储的永远是下一次循环里e节点的父节点,如果下一次e匹配上了,那么p就是node的父节点 p = e; } while ((e = e.next) != null);// 如果e存在下一个节点,那么继续去匹配下一个节点。直到匹配到某个节点跳出或者遍历完链表所有节点 } } // 如果node不为空,说明根据key匹配到了要删除的节点 // 如果不需要对比value值或者需要对比value值且value值也相等,那么Node节点就是要删除的节点 if (node != null && (!matchValue || (v = node.value) == value || (value != null && value.equals(v)))) { if (node instanceof TreeNode)// 如果该节点是个红黑树节点,调用removeTreeNode方法移除该节点 ((TreeNode<K,V>)node).removeTreeNode(this, tab, movable); else if (node == p)// 如果该节点不是TreeNode对象,node == p 的意思是该node节点就是首节点 tab[index] = node.next;// 由于删除的是首节点,那么直接将节点数组对应位置指向到第二个节点即可 else// 如果node节点不是首节点,此时p是node的父节点,由于要删除node,所有只需要把p的下一个节点指向到node的下一个节点即可把node从链表中删除了 p.next = node.next; ++modCount;// HashMap的修改次数递增 --size;// HashMap的元素个数递减 afterNodeRemoval(node);// 调用afterNodeRemoval方法,该方法HashMap没有任何实现逻辑,目的是为了让子类根据需要自行覆写 return node; } } return null; }
在Java1.5中,并发编程大师Doug Lea给我们带来了concurrent包,而该包中提供的ConcurrentHashMap是线程安全并且高效的HashMap。在并发编程中使用HashMap put元素时可能会导致数据被覆盖,而使用线程安全的HashTable效率又低下。比如,在put元素的时候执行到此处代码,
此行代码是判断是否出现hash碰撞,假设两个线程A、B都在进行put操作,并且hash函数计算出的插入下标是相同的,当线程A执行完上图红框代码后由于时间片耗尽导致被挂起,而线程B得到时间片后在该下标处插入了元素,完成了正常的插入,然后线程A获得时间片,由于之前已经进行了hash碰撞的判断,所有此时不会再进行判断,而是直接进行插入,这就导致了线程B插入的数据被线程A覆盖了,从而线程不安全。
除此之外,在put元素的时候执行到下面这一样代码也会出现线程不安全问题。这行处有个++size,假设线程A、B,这两个线程同时进行put操作时,假设当前HashMap的size大小为5,当线程A执行到此行代码时,从主内存中获得size的值为5后准备进行+1操作,但是由于时间片耗尽只好让出CPU,线程B拿到CPU还是从主内存中拿到size的值5进行+1操作,完成了put操作并将size=6写回主内存,然后线程A再次拿到CPU并继续执行(此时size的值仍为5),当执行完put操作后,还是将size=6写回内存,此时,线程A、B都执行了一次put操作,但是size的值只增加了1,所有说还是由于数据覆盖又导致了线程不安全。
于是在多线程并发处理下,ConcurrentHashMap解决了HashMap在put操作的时候数据被覆盖问题。ConcurrentHashMapUML图如下图所示:
ConcurrentHashMap一共有5个构造方法,如下图所示:
/**
* 创建一个空的map,使用默认的初始容量大小(16)
*/
public ConcurrentHashMap() {
}
/**
* 创建一个空的map,指定初始容量的大小
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
/**
* 创建一个空的map,指定初始容量和负载因子的大小
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
/**
* 创建一个空的map,指定初始容量、负载因子大小和并发更新线程的数量
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
可以发现,在new 出一个map 的实例时,并不会创建其中的数组等等相关的部件,只是进行简单的属性设置而已,同样的,table 的大小也被规定为必须是2的n次方数。真正的初始化在放在了是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge 等方法的时候,调用时机是检查table==null。
常量
成员变量
JDK1.8 ConcurrentHashMap的数据结构跟JDK1.7 ConcurrentHashMap的数据结构有比较大的区别,取消了segments字段,直接采用transient volatile Node<K,V>[] table 保存数据,采用table数组元素作为锁,从而实现了对缩小锁的粒度,进一步减少并发冲突的概率,并大量使用了采用了CAS + synchronized 来保证并发安全性。
将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash 之后散列的很均匀,那么table数组中的每个队列长度主要为0 或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于元素个数超过8(默认值)的链表,jdk1.8 中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。
使用Node(1.7 为Entry)作为链表的数据结点,仍然包含key,value,hash 和next 四个属性。红黑树的情况使用的是TreeNode(extends Node)。根据数组元素中,第一个结点数据类型是Node 还是TreeNode 可以判断该位置下是链表还是红黑树。
JDK1.8数据结构如下图所示:
在分析put操作之前,我们先来看看其中的几个核心方法,他们都是利用硬件级别的原子操作来操作数据,分别是tabAt(Node<K,V>[] tab, int i),casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v),setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)。
/**
* 利用硬件级别的原子操作,获得在i位置上的Node节点
* Unsafe.getObjectVolatile可以直接获取指定内存的数据,
* 保证了每次拿到数据都是最新的
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/**
* 利用CAS操作设置i位置上的Node节点
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/**
* 利用硬件级别的原子操作,设置在i位置上的Node节点
* Unsafe.putObjectVolatile可以直接设定指定内存的数据,
* 保证了其他线程访问这个节点时一定可以看到最新的数据
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
ConcurrentHashMap的put方法就是沿用HashMap的put 方法的思想,根据hash 值
计算这个新插入的元素在table中的位置 i,如果 i 位置是空的,直接放进去即可;否则进行如下判断,如果 i 位置是树节点,则按照树的方式插入新的节点,否则把元素插入到 i 位置链表的末尾。
从整体流程上来看:
/** * * 将指定的key-value存储到map中,并且key和value都不能为空,如果传入的key或者value为空,则会报空指针异常。 * */ public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { // 如果key或者value为null,则抛出空指针异常 if (key == null || value == null) throw new NullPointerException(); // 计算hash值 int hash = spread(key.hashCode()); int binCount = 0; // 死循环,直到插入成功才跳出循环 for (Node<K,V>[] tab = table;;) { // f表示tab数组i位置处链表的头结点或者是红黑树的根节点;n表示tab数组的长度;i表示tab数组下标;fh表示是链表(fh>0)的节点还是树的节点 Node<K,V> f; int n, i, fh; // 如果tab为null或者tab的长度n为0,则需要对tab进行初始化操作。表示第一次put元素 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 获取tab数组中i位置处的值,如果值为null,表示该位置没有值 // tab数组中i位置处没有值,使用CAS操作put元素 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 正在进行扩容,当前线程帮忙扩容 tab = helpTransfer(tab, f); else { V oldVal = null; // 锁Node数组中的元素,这个位置是Hash冲突组成链表的头结点或者是红黑树的根节点 synchronized (f) { // 判断i位置处的元素数据是否等于f节点的数据,只有相等才表示是i位置处的第一个元素 if (tabAt(tab, i) == f) { // fh>0 说明这个节点是一个链表的节点,不是树的节点 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; // put操作和putIfAbsent操作业务实现,判断Node节点中是否存在相同的key if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; // 判断是否替换旧值 if (!onlyIfAbsent) e.val = value; break; } // 新put元素的前驱节点 Node<K,V> pred = e; // 如果遍历到了最后一个结点,使用尾插法,把它插入在链表尾部 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // tab数组中第一个元素是一个树节点,按照树的方式插入值 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { // 达到临界值8 就需要把链表转换为树结构或者对数组进行扩容,后续会详细介绍treeifyBin方法。 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // Map的元素数量+1,并检查是否需要扩容 addCount(1L, binCount); return null; }
初始化方法,initTable()过程分析.从前面分析的构造方法中不难看出,在构造方法中并没有真正初始化,真正的初始化在放在了是在向ConcurrentHashMap中put元素的时候初始化的。具体实现的方法就是initTable()方法,详细分析如下。
/** * 初始化table,使用sizeCtl记录的大小 */ private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // sizeCtl < 0,表示有其他线程正在进行初始化操作 if ((sc = sizeCtl) < 0) // 让出当前线程CPU时间,对应table数组的初始化工作,只能有一个线程来处理 Thread.yield(); // lost initialization race; just spin // 使用CAS操作,将sizeCtl设置为-1,代表当前线程抢到了锁,可以对table进行初始化了 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { // 如果 sc > 0,使用初始化时提供的大小,否则使用DEFAULT_CAPACITY默认初始容量是16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") // 初始化数组,长度为16或初始化时提供的长度 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 将初始化的数组 nt 赋值给 table,table 是 volatile 的,保证了对其他线程的可见性 table = tab = nt; // n右移2位,其实就是n变为n原值的1/4,也就是 sc = 0.75 * n,如果数组长度n为16,则这里 sc = 12, sc = n - (n >>> 2); } } finally { // 设置 sizeCtl 为 sc,扩容的阈值 sizeCtl = sc; } break; } } return tab; }
链表转红黑树,treeifyBin(Node<K,V>[] tab, int index)方法。用于将过长的链表转换为TreeBin对象,但是并不是直接转换红黑树,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才将链表的结构转换为TreeBin,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode。接下来我们就对此方法进行详细的分析。
private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { // 如果tab数组长度n小于MIN_TREEIFY_CAPACITY(默认为64)的时候,不会将链表转换为红黑树 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 对数组进行扩容操作 tryPresize(n << 1); // 获取节点信息,b是头结点 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 给节点b加锁 synchronized (b) { // 再次判断是否为同一节点信息 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; // 遍历链表,建立一颗红黑树 for (Node<K,V> e = b; e != null; e = e.next) { // 创建一个节点p TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } // 将红黑树设置到数组相应位置中,利用了TreeBin这个小容器来封装所有的TreeNode setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
扩容操作,tryPresize(int size)方法。需要注意的是,调用tryPresize方法的时候,参数size已经翻倍,该过程需要结合transfer方法来完成完整的扩容操作。
sizeCtl变量:用来控制table的初始化和扩容操作。负数代表正在进行初始化或扩容操作,-1代表正在初始化,-N表示有N-1个线程正在进行扩容操作;0 为默认值,代表当时的table 还没有被初始化。正数表示初始化大小或Map 中的元素达到这个数量时,需要进行扩容了。
private final void tryPresize(int size) { // 如果size大于等于最大容量MAXIMUM_CAPACITY的一半,则c的值为MAXIMUM_CAPACITY,否则c值为:size * 1.5 + 1,再往上取最近的2的n次方。 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; //此时的sizeCtl是cap * 0.75,扩容阈值,如果sizeCtl为正数或者0 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 这个if判断逻辑跟之前的初始化数组的initTable()方法代码基本一致,如果table还未进行初始化,则执行以下代码 if (tab == null || (n = tab.length) == 0) { // 计算数组长度 n = (sc > c) ? sc : c; //使用 CAS 将 sizeCtl 设为-1,表示 table 正在进行初始化操作 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { // 表示table没有被其他线程修改过 if (table == tab) { @SuppressWarnings("unchecked") // 初始化一个Node数组,大小为n,并赋值给table Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); // 0.75 * n } } finally { // 无论发生什么情况,都要给sizeCtl赋值 sizeCtl = sc; } } } // 如果c的大小未达到扩容的阈值或者数组长度n大于等于最大容量值,则直接 break else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { //tab == table说明还没开始迁移节点 /**返回table的生成戳stamp bits,每个n都有不同的生成戳stamp bits,结果必须是一个负数。 * static final int resizeStamp(int n) { * return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); * } * Integer.numberOfLeadingZeros(n)在指定 int 值的二进制补码表示形式中最高位(最左边)的 1 位之前,返回零位的数量 * 例如 n为16 0001 0000 则Integer.numberOfLeadingZeros(n)为27,因为n为2的幂次方,因此不同的n此结果也不同 * 然后与(1 << (RESIZE_STAMP_BITS - 1)) | ,相当于2^15 | n中0的个数。 * (因此其左移16位后符号位为1,结果肯定是个负数) */ int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; /** * 1.第一个判断,sc右移 RESIZE_STAMP_SHIFT 位,也就是比较高 ESIZE_STAMP_BITS 位生成戳和rs是否相等,相等则代表是同一个n,是在同一容量下进行的扩容 * 2.第二个判断,sc == rs + 1,扩容结束了,不再有线程进行扩容。默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一,此时,sc 就等于 rs + 1。 * 3.第三个判断,判断当前帮助扩容线程数是否已达到MAX_RESIZERS最大扩容线程数 * 4.第四个判断,表示的是扩容新建的数组,如果nextTable 为null,表示已经转到扩容完成,nextTable已经被置为空。 * 5.第五个判断,transferIndex是数组分配索引,多线程扩容的基础,transferIndex <= 0 表示需要transfer的数组已经被分配完了。 */ if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 扩容已经结束,中断循环 // 使用 CAS 操作将 sizeCtl 加 1,然后执行 transfer 方法,此时 nextTab 不为 null,sc 加 1 表示多了一个线程在帮助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } /** * 触发扩容(第一个进行扩容的线程) * 如果没有线程在进行扩容,那么使用CAS操作修改sizeCtl值,作为扩容的第一个发起线程,rs左移RESIZE_STAMP_SHIFT位+2,左移结果为一个负数,代表有一个线程正在进行扩容操作 * * 此时sizeCtl高 RESIZE_STAMP_BITS 位为生成戳,低 RESIZE_STAMP_SHIFT 位为扩容线程数 */ else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //第一个发起transfer数据的线程,传入nextTable为null transfer(tab, null); } } }
tryPresize方法的核心在于 sizeCtl 值的操作,首先第一个线程来扩容的时候,将其设置为一个负数,然后执行 transfer(tab, null)方法,方法参数 nextTab 为 null;之后的循环将 sizeCtl 加 1,并执行 transfer(tab, nt)方法,方法参数 nextTab 不为 null。因此,可能的操作就是执行 1 次 transfer(tab, null) + 多次 transfer(tab, nt)。
如果其他线程正在扩容,则帮助其他线程一起对table进行扩容操作。跟踪该方法在putVal或者其他方法源码中的使用,发现只有在桶的头节点的hash值为Moved时才会调用,功能是帮助迁移节点
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; /** * 1、如果tab不为null,且节点f的hash值为MOVED,说明f节点是ForwardingNode,且nextTable不为null,才会执行该方法的以下逻辑 * 2、如果nextTable为null,表示已经transfer完成了 */ if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { /** * 1.第一个判断,sc右移 RESIZE_STAMP_SHIFT 位,也就是比较高 ESIZE_STAMP_BITS 位生成戳和rs是否相等,相等则代表是同一个n,是在同一容量下进行的扩容 * 2.第二个判断,sc == rs + 1,扩容结束了,不再有线程进行扩容。默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一,此时,sc 就等于 rs + 1。 * 3.第三个判断,判断当前帮助扩容线程数是否已达到MAX_RESIZERS最大扩容线程数 * 4.第五个判断,transferIndex是数组分配索引,多线程扩容的基础,transferIndex <= 0 表示需要transfer的数组已经被分配完了。 */ if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; //只要有一个线程来帮助transfer,sizeCtl就+1,初始值为(rs << RESIZE_STAMP_SHIFT) + 2)(具体请看tryPresize()方法中的逻辑),之后再transfer中会用到 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }
addCount(long x, int check)方法,用于记录map中节点的总数量,当总节点数量超过sizeCtl值时,则对数组执行扩容操作,将数组长度扩大为原来的2倍。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; // 如果计数器单元表(counterCells)不为null或者计数器单元表为null,且对baseCount做+1(参数x)操作,且更新失败了,则执行以下代码 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 虽然对baseCount做+1操作失败了,但是s = b + x 依然保留了下来,s记录了当前map中总节点个数 CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } // 如果 check <= 1 ,则不作任何操作;调用addCount方法时,参数 binCount=1,说明是链表,且替换了head节点的val值 // 或者数组是空的, 添加新的head,就不做数组扩容的操作了 // binCount=1,就没必要对数据做扩容 if (check <= 1) return; // 用计数器单元表来保存所有总节点数量,并返回总节点数量 s = sumCount(); } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; // 如果map中添加的节点总数s大于sizeCtl,则执行数组扩容操作,将数组的长度扩大为原来的2倍 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { /**返回table的生成戳stamp bits,每个n都有不同的生成戳stamp bits,结果必须是一个负数。 * static final int resizeStamp(int n) { * return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); * } * Integer.numberOfLeadingZeros(n)在指定 int 值的二进制补码表示形式中最高位(最左边)的 1 位之前,返回零位的数量 * 例如 n为16 0001 0000 则Integer.numberOfLeadingZeros(n)为27,因为n为2的幂次方,因此不同的n此结果也不同 * 然后与(1 << (RESIZE_STAMP_BITS - 1)) | ,相当于2^15 | n中0的个数。 * (因此其左移16位后符号位为1,结果肯定是个负数) */ int rs = resizeStamp(n); // 如果sc < 0,表示正在扩容 if (sc < 0) { /** * 1.第一个判断,sc右移 RESIZE_STAMP_SHIFT 位,也就是比较高 ESIZE_STAMP_BITS 位生成戳和rs是否相等,相等则代表是同一个n,是在同一容量下进行的扩容 * 2.第二个判断,sc == rs + 1,扩容结束了,不再有线程进行扩容。默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一,此时,sc 就等于 rs + 1。 * 3.第三个判断,判断当前帮助扩容线程数是否已达到MAX_RESIZERS最大扩容线程数 * 4.第四个判断,表示的是扩容新建的数组,如果nextTable 为null,表示已经转到扩容完成,nextTable已经被置为空。 * 5.第五个判断,transferIndex是数组分配索引,多线程扩容的基础,transferIndex <= 0 表示需要transfer的数组已经被分配完了。 */ if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 扩容已经结束,中断循环 // 使用 CAS 操作将 sizeCtl 加 1,然后执行 transfer 方法,此时 nextTab 不为 null,sc 加 1 表示多了一个线程在帮助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } /** * 触发扩容(第一个进行扩容的线程) * 如果没有线程在进行扩容,那么使用CAS操作修改sizeCtl值,作为扩容的第一个发起线程,rs左移RESIZE_STAMP_SHIFT位+2,左移结果为一个负数,代表有一个线程正在进行扩容操作 * * 此时sizeCtl高 RESIZE_STAMP_BITS 位为生成戳,低 RESIZE_STAMP_SHIFT 位为扩容线程数 */ else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //第二个参数为null会初始化新数组nextTable,确保只有一个线程新建table。始扩容。默认将数组长度扩容为原来的2倍。 transfer(tab, null); s = sumCount(); } } }
transfer()方法代码实在是有点长了,将原来的 tab 数组的元素迁移到新的 nextTab 数组中,不过各位不要担心看不懂,下面我们来慢慢分析。当 ConcurrentHashMap 容量不足的时候,需要对 table 进行扩容。这个方法的基本思想跟 HashMap 是很像的,但是由于它是支持并发扩容的,所以要复杂的多。
为何要并发扩容?因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,就能利用并发处理去减少扩容带来的时间影响。
首先我们先来看看有哪几个地方调用了transfer方法,一共会有以下3个地方调用transfer方法,它们分别是 addCount(long x, int check) 方法、helpTransfer(Node<K,V>[] tab, Node<K,V> f) 方法和tryPresize(int size) 方法。transfer方法支持多线程执行,在外围调用的时候会保证第一个发起数据迁移的线程在调用transfer方法的时候参数 nextTab 为null,之后再调用transfer方法的时候参数 nextTab 不为 null。
在阅读源码之前,先要理解并发操作的机制。原数组长度为 n,所以我们有 n 个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他没做完的任务,帮助迁移就可以了,并发编程大师 Doug Lea 使用了一个 stride,默认值为 MIN_TRANSFER_STRIDE(16),简单理解就是步长,每个线程每次负责迁移其中的一部分,每次迁移 16 个小任务。
所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性 transferIndex 的作用。
第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程,这个读者应该能理解吧。整个过程,其实就是将一个大的迁移任务分为了一个个任务包。
整个扩容操作分为两个部分:
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
整个扩容流程就是遍历和复制:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { // n表示旧table数组的长度;stride表示步长,也就是每个线程处理迁移的节点数 int n = tab.length, stride; /** * 1、单核CPU条件下 stride 为 n * 2、多核CPU条件下 stride 为 (n >>> 3) / NCPU,最小值为 MIN_TRANSFER_STRIDE(默认16) * 3、将n个任务拆分成多个任务包,每个任务包有stride个任务 */ if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range // 如果nextTab为null,需要进行初始化操作,只有第一个线程执行的时候需要执行以下初始化操作,参数nextTab为null,后续线程调用改方法的时候nextTab不为null if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") // 容量为原来table的2倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } // 将nextTab赋值给nextTable,nextTable为全局属性 nextTable = nextTab; // 将n赋值给transferIndex,transferIndex也是一个全局属性,用于控制迁移位置 transferIndex = n; } int nextn = nextTab.length; // 创建一个ForwardingNode节点,nextTab作为参数,key、value、next都为null,hash为MOVED,nextTable全局属性为nextTab // TODO 主要用于判断原数组中i位置处的节点是否迁移完成,原数组中位置i处的节点完成迁移工作后,就会将位置i处设置为这个ForwardingNode,用来告诉其他线程该位置已经处理过了,所以它其实相当于是一个标志。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // advance为true表示可以继续迁移下一个节点,false表示停止迁移 boolean advance = true; // 是否结束迁移 boolean finishing = false; // to ensure sweep before committing nextTab // i是当前迁移位置的索引,bound是迁移的边界,需要注意的是从后往前的顺序 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //一个while循环,循环里面有三个if条件判断语句,而且每个分支都有把advance=false //分析场景:table.length=32,此时执行到这个地方nextTab.length=64 A,B线程同时进行扩容。 //A,B线程同时执行到while循环中cas这段代码 //A线程获第一时间抢到资源,设置bound= nextBound=16 i = nextIndex - 1=31 A线程搬运table[31]~table[16]中间16个元素 //B线程再次回到while起点,然后在次获取到 bound = nextBound-0,i=nextIndex - 1=15,B线程搬运table[15]~table[0]中间16个元素 // advance为true,表示可以进行下一个节点的迁移,i指向了transferIndex,bound指向了transferIndex-stride while (advance) { int nextIndex, nextBound; // while循环迁移的 --i 条件,第一个条件一直为true,finishing为true则迁移任务结束 if (--i >= bound || finishing) advance = false; // 将上一次迁移之后的边界transferIndex值赋给nextIndex,如果transferIndex小于等于0,说明原数组的所有位置都有相应的线程去处理了,则当前线程不用transfer了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //使用CAS操作,将nextBound赋值给transferIndex,nextBound = nextIndex - stride(上一个边界减去步长) else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { // nextBound表示这次迁移任务的边界,需要注意的是从后往前 bound = nextBound; i = nextIndex - 1; // 上一个边界-1,i即为开始迁移的位置 advance = false; } } // i < 0,表示迁移任务已经完成了,这个i在上面while循环中,当transferIndex=0时候,i=-1。 if (i < 0 || i >= n || i + n >= nextn) { int sc; //如果finishing=true,说明扩容所有工作完成,然后跳出循环 if (finishing) { nextTable = null; // 所有的迁移操作已经完成,将nextTable赋值为null table = nextTab; // 将新的nextTab赋值给table 属性,完成迁移 sizeCtl = (n << 1) - (n >>> 1); // 重新计算sizeCtl值,为新数组长度 n * 0.75 return; } /** * 1、在第一个线程调用改方法前,将sizeCtl设置为 (rs << RESIZE_STAMP_SHIFT) + 2,之后的每有一个线程参与迁移就会将 sizeCtl + 1 * 2、当前线程已经完成迁移工作,使用CAS操作将 sizeCtl - 1,代表做完了自己的任务 */ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 不相等说明还有其他线程没完成迁移,该线程结束任务 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //如果相等,则说明所有线程都完成任务了,只有最后一个线程能执行到这块代码,用来完成扩容最后一部分工作,将设置finish为true,会进入上面的if(finishing){} finishing = advance = true; i = n; // recheck before commit } } // 如果i位置处没有任何节点,则放入上面初始化的ForwardingNode节点,fwd为空节点 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果该位置处为ForwardingNode节点,则表示该位置已经被迁移过了,就可以开始迁移下一个节点了 else if ((fh = f.hash) == MOVED) advance = true; // already processed 都已经处理过了 else { // 对数组i位置处的节点进行加锁,开始迁移该位置处的数据 synchronized (f) { // 再次判断是否是同一个节点 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; // fh >= 0,头节点的hash值大于0,表示节点是一个链表结构,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上 if (fh >= 0) { // 将链表一分为二,找到原链表中的 lastRun,然后lastRun及其之后的节点是一起进行迁移的,lastRun之前的节点需要进行克隆,然后分到两个链表中 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } // 一个链表放在新数组的i位置处 setTabAt(nextTab, i, ln); // 另一个链表放在新数组的i+n位置处 setTabAt(nextTab, i + n, hn); // 将原数组i位置处设置为 ForwardingNode节点,表示i位置处的数据已经处理完毕,其他线程只要看到该位置的hash值为MOVED,就不会进行迁移了 setTabAt(tab, i, fwd); // advance 设置为 true,代表该位置已经迁移完毕,返回到上面的while循环中,就可以执行i--操作了 advance = true; } // 节点为红黑树,迁移红黑树上的节点数据 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 一分为二之后,如果节点数小于等于树化阈值,则将红黑树转换为链表结构 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 将 ln 放置在新数组的i位置处 setTabAt(nextTab, i, ln); // 将 hn 放置在新数组的i + n位置处 setTabAt(nextTab, i + n, hn); // 将原数组i位置处设置为 ForwardingNode节点,表示i位置处的数据已经处理完毕,其他线程只要看到该位置的hash值为MOVED,就不会进行迁移了 setTabAt(tab, i, fwd); // advance 设置为 true,代表该位置已经迁移完毕,返回到上面的while循环中,就可以执行i--操作了 advance = true; } } } } } }
根据指定的key返回对应的value,如果map中不存在指定的key,则返回null。get方法比较简单,给定一个key来确定value的时候,必须满足两个条件key相同hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。
public V get(Object key) { // 定义tab数组;当前节点e;当前节点是树p;tab数组的长度n;当前节点的hash值eh;当前节点的key值ek; Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // 计算key的hash值 int h = spread(key.hashCode()); // 如果tab不为null,且tab的长度你大于0 ,且根据hash值计算出来tab位置处有值 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 如果当前节点的hash值与传入参数key的hash值相等,key也相等,则当前节点e就是要找的节点 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 如果 eh < 0,说明这个节点在树上,调用树的find方法寻找节点数据 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; // 代码执行到这一步说明是个链表,遍历链表找到对应的值并返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
移除方法的基本流程和put 方法很类似,只不过操作由插入数据变为移除数据而已,而且如果存在红黑树的情况下,会检查是否需要将红黑树转为链表的步骤。
public V remove(Object key) { return replaceNode(key, null, null); } /** * 根据指定的 key 删除节点或者更新节点value * @param key 删除或者更新节点的 key * @param value 当 value==null 时 ,删除节点,否则更新节点的值为value。在replace方法中调用时参数 value 值不为 null。 * @param cv 一个期望值,当 map[key].value 等于期望值 cv 或者 cv==null的时候 ,删除节点,或者更新节点的值 * @return */ final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果 tab 为 null 或者 tab 长度 n 为0 再或者 tab 数组的 i 位置处值为 null,则直接返回 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; // tab 数组正在扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; // 给当前节点加上同步锁 synchronized (f) { // 利用硬件级别的原子操作,获得在 i 位置上的 Node 节点,相等则说明第一个节点没有发生变化 if (tabAt(tab, i) == f) { // 如果 fh >= 0,表示链表节点 if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; // 与当前节点 e 的 key 和 hash 值进行比较 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { // 当前节点的value值 V ev = e.val; // 符合更新 value 或者删除节点的条件 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; // 更新 value,在 replace 方法中调用该方法时执行 if (value != null) e.val = value; // 删除非头节点 else if (pred != null) // 前驱节点不为 null // 前驱节点的后继节点为 e 的后继节点,即删除了 e 节点 pred.next = e.next; // 删除头节点 else setTabAt(tab, i, e.next); // 设置 tab 数组中下标为 i 的值为 e.next } break; } // 当前节点不是目标删除节点,继续遍历下一个节点 pred = e; // 已经遍历循环到了链表尾部,仍然没有找到需要删除的节点,则跳出循环 if ((e = e.next) == null) break; } } // 表示节点是红黑树节点 else if (f instanceof TreeBin) { validated = true; // 节点类型强制转换为 TreeBin TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; // 根节点不为 null,并且存在与指定hash和key相等的结点 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; // 更新 value,在 replace 方法中调用该方法时执行 if (value != null) p.val = value; // 删除节点 else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) // 如果删除了节点,更新 map 中的 size 大小 addCount(-1L, -1); return oldVal; } break; } } } return null; }
本文主要讲解了JDK1.8中的HashMap和ConcurrentHashMap的基本介绍及其put、get、remove过程的源码分析,作者能力有限,难免会在某些地方分析得不到位,如有不正确的地方希望各位读者指正,大家一起进步,希望对各位有所帮助。
备注:博主微信公众号,不定期更新文章,欢迎扫码关注。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。