当前位置:   article > 正文

Java ConcurrentHashMap 高并发安全实现原理解析

Java ConcurrentHashMap 高并发安全实现原理解析

if (tab == null || (n = tab.length) == 0)

tab = initTable();

else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {

//创建占位Node

Node<K, V> r = new ReservationNode<K, V>();

//先锁定该占位Node

synchronized ® {

//将其设置到BIN的头节点

if (casTabAt(tab, i, null, r)) {

binCount = 1;

Node<K, V> node = null;

try {

//开始原子计算

if ((val = remappingFunction.apply(key, null)) != null) {

delta = 1;

node = new Node<K, V>(h, key, val, null);

}

} finally {

//设置计算后的最终节点

setTabAt(tab, i, node);

}

}

}

if (binCount != 0)

break;

} else if ((fh = f.hash) == MOVED)

tab = helpTransfer(tab, f);

else {

synchronized (f) {

if (tabAt(tab, i) == f) {

if (fh >= 0) {

//此处省略对普通链表的变更操作

} else if (f instanceof TreeBin) {

//此处省略对红黑树的变更操作

}

}

}

}

}

if (delta != 0)

addCount((long) delta, binCount);

return val;

}

3、如何保证原子性

=============

computeIfAbsent/computeIfPresent中判空与计算是原子操作,根据上述分析主要是通过casTabAt(tab, i, null, r)原子操作,或者使用ReserveNode占位并锁定的方式,或者锁住bin的头节点的方式来实现的。

也就是说整个bin一直处于锁定状态,在获取到目标KEY的value是否为空以后,其它线程无法变更目标KEY的值,判空与计算自然是原子的。

而casTabAt(tab, i, null, r)是由硬件层面的原子指令来保证的,能够保证同一个内存区域在compare和set操作之间不会有任何其它指令对其进行变更。

八、resize过程中的并发transfer

==========================

C13Map中总共有三处地方会触发transfer方法的调用,分别是addCount、tryPresize、helpTransfer三个函数。

  • addCount 用于写操作完成后检验元素数量,如果超过了sizeCtl中的阈值,则触发resize扩容和旧表向新表的transfer。

  • tryPresize 是putAll一次性插入一个集合前的自检,如果集合数目较大,则预先触发一次resize扩容和旧表向新表的transfer。

  • helpTransfer 是写操作过程中发现bin的头节点是ForwardingNode, 则调用helpTransfer加入协助搬运的行列。

1、开始transfer前的检查工作

======================

以addCount中的检查逻辑为例:

addCount中的transfer检查

Node<K, V>[] tab, nt;

int n, sc;

//当前的tableSize已经超过sizeCtl阈值,且小于最大值

while (s >= (long) (sc = sizeCtl) && (tab = table) != null &&

(n = tab.length) < MAXIMUM_CAPACITY) {

int rs = resizeStamp(n);

//已经在搬运中

if (sc < 0) {

if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||

sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||

transferIndex <= 0)

break;

//搬运线程数加一

if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))

transfer(tab, nt);

} else if (U.compareAndSwapInt(this, SIZECTL, sc,

(rs << RESIZE_STAMP_SHIFT) + 2))

//尚未搬运,当前线程是本次resize工作的第一个线程,设置初始值为2,非常巧妙的设计

transfer(tab, null);

s = sumCount();

}

多处应用了对变量sizeCtl的CAS操作,sizeCtl是一个全局控制变量。

参考下此变量的定义:private transient volatile int sizeCtl;

  • 初始值是0表示哈希表尚未初始化

  • 如果是-1表示正在初始化,只允许一个线程进入初始化代码块

  • 初始化或者reSize成功后,sizeCtl=loadFactor * tableSize也就是触发再次扩容的阈值,是一个正整数

  • 在扩容过程中,sizeCtrl是一个负整数,其高16位是与当前的tableSize关联的邮戳resizeStamp,其低16位是当前从事搬运工作的线程数加1

在方法的循环体中每次都将table、sizeCtrl、nextTable赋给局部变量以保证读到的是当前的最新值,且保证逻辑计算过程中变量的稳定。

如果sizeCtrl中高16位的邮戳与当前tableSize不匹配,或者搬运线程数达到了最大值,或者所有搬运的线程都已经退出(只有在遍历完所有槽位后才会退出,否则会一直循环),或者nextTable已经被清空,跳过搬运操作。

如果满足搬运条件,则对sizeCtrl做CAS操作,sizeCtrl>=0时设置初始线程数为2,sizeCtrl<0时将其值加1,CAS成功后开始搬运操作,失败则进入下一次循环重新判断。

首个线程设置初始值为2的原因是:线程退出时会通过CAS操作将参与搬运的总线程数-1,如果初始值按照常规做法设置成1,那么减1后就会变为0。

此时其它线程发现线程数为0时,无法区分是没有任何线程做过搬运,还是有线程做完搬运但都退出了,也就无法判断要不要加入搬运的行列。

值得注意的是,代码中的“sc == rs + 1 || sc == rs + MAX_RESIZERS“是JDK8中的明显的BUG,少了rs无符号左移16位的操作;JDK12已经修复了此问题。

2、并发搬运过程和退出机制

=================

C13Map的transfer方法

private final void transfer(Node<K, V>[] tab, Node<K, V>[] nextTab) {

int n = tab.length, stride;

//一次搬运多少个槽位

if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)

stride = MIN_TRANSFER_STRIDE;

if (nextTab == null) {

try {

//首个搬运线程,负责初始化nextTable

Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n << 1];

nextTab = nt;

} catch (Throwable ex) {

sizeCtl = Integer.MAX_VALUE;

return;

}

nextTable = nextTab;

//初始化当前搬运索引

transferIndex = n;

}

int nextn = nextTab.length;

//公共的forwardingNode

ForwardingNode<K, V> fwd = new ForwardingNode<K, V>(nextTab);

boolean advance = true;

boolean finishing = false; // 保证提交nextTable之前已遍历旧表的所有槽位

for (int i = 0, bound = 0; ; ) {

Node<K, V> f;

int fh;

//循环CAS获取下一个搬运区段

while (advance) {

int nextIndex, nextBound;

//搬运已结束,或者当前区段尚未完成,退出循环体;最后一次抄底扫描时,仅辅助做i减一的运算

if (–i >= bound || finishing)

advance = false;

else if ((nextIndex = transferIndex) <= 0) {

i = -1;

advance = false;

} else if (U.compareAndSwapInt

(this, TRANSFERINDEX, nextIndex,

nextBound = (nextIndex > stride ?

nextIndex - stride : 0))) {

bound = nextBound;

i = nextIndex - 1;

advance = false;

}

}

if (i < 0 || i >= n || i + n >= nextn) {

int sc;

if (finishing) {

nextTable = null;

table = nextTab;

sizeCtl = (n << 1) - (n >>> 1);

return;

}

if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

//并非最后一个退出的线程

if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)

return;

finishing = advance = true;

//异常巧妙的设计,最后一个线程推出前将i回退到最高位,等于是强制做最后一次的全表扫描;程序直接执行后续的else if代码,看有没有哪个槽位漏掉了,或者说是否全部是forwardingNode标记;

//可以视为抄底逻辑,虽然检测到漏掉槽位的概率基本是0

i = n;

}

} else if ((f = tabAt(tab, i)) == null)

//空槽位直接打上forwardingNode标记,CAS失败下一次循环继续搬运该槽位,成功则进入下一个槽位

advance = casTabAt(tab, i, null, fwd);

else if ((fh = f.hash) == MOVED)

advance = true; //最后一次抄底遍历时,正常情况下所有的槽位应该都被打上forwardingNode标记

else {

//锁定头节点

synchronized (f) {

if (tabAt(tab, i) == f) {

Node<K, V> ln, hn;

if (fh >= 0) {

//…此处省略链表搬运代码:职责是将链表拆成两份,搬运到nextTable的i和i+n槽位

setTabAt(nextTab, i, ln);

setTabAt(nextTab, i + n, hn);

//设置旧表对应槽位的头节点为forwardingNode

setTabAt(tab, i, fwd);

advance = true;

} else if (f instanceof TreeBin) {

//…此处省略红黑树搬运代码:职责是将红黑树拆成两份,搬运到nextTable的i和i+n槽位,如果满足红黑树的退化条件,顺便将其退化为链表

setTabAt(nextTab, i, ln);

setTabAt(nextTab, i + n, hn);

//设置旧表对应槽位的头节点为forwardingNode

setTabAt(tab, i, fwd);

advance = true;

}

}

}

}

}

}

多个线程并发搬运时,如果是首个搬运线程,负责nextTable的初始化工作;然后借助于全局的transferIndex变量从当前table的n-1槽位开始依次向低位扫描搬运,通过对transferIndex的CAS操作一次获取一个区段(默认是16),当transferIndex达到最低位时,不再能够获取到新的区段,线程开始退出,退出时会在sizeCtl上将总的线程数减一,最后一个退出的线程将扫描坐标i回退到最高位,强迫做一次抄底的全局扫描。

3、transfer过程中的读写安全性分析

=========================

(1)首先是transfer过程中是否有可能全局的哈希表table发生多次resize,或者说存在过期的风险?

===========================================================

观察nextTable提交到table的代码,发现只有在所有线程均搬运完毕退出后才会commit,所以但凡有一个线程在transfer代码块中,table都不可能被替换;所以不存在table过期的风险。

(2)有并发的写操作时,是否存在安全风险?

=========================

因为transfer操作与写操作都要竞争bin的头节点的syncronized锁,两者是互斥串行的;当写线程得到锁后,还要做doubleCheck,发现不是一开始的头节点时什么事情都不会做,发现是forwardingNode,就会加入搬运行列直到新表被提交,然后去直接操作新表。

nextTable的提交总是在所有的槽位都已经搬运完毕,插上ForwardingNode的标识之后的,因此只要新表已提交,旧表必定无法写入;这样就能够有效的避免数据写入旧表。

推理:获取到bin头节点的同步锁开始写操作----------> transfer必然未完成--------->新表必然未提交-------→写入的必然是当前表。

也就说永远不可能存在新旧两张表同时被写入的情况,table被写入时nextTable永远都只能被读取。

(3)有并发的读操作时,是否存在安全风险?

=========================

transfer操作并不破坏旧的bin结构,如果尚未开始搬运,将会照常遍历旧的BIN结构;如果已搬运完毕,会调用到forwadingNode的find方法到新表中递归查询,参考上文中的forwadingNode介绍。

九、Traverser遍历器

==================

因为iterator或containsValue等通用API的存在,以及某些业务场景确实需要遍历整个Map,设计一种安全且有性能保证的遍历机制显得理所当然。

C13Map遍历器实现的难点在于读操作与transfer可能并行,在扫描各个bin时如果遇到forwadingNode该如何处理的问题。

由于并发transfer机制的存在,在某个槽位上遇到了forwadingNode,仅表明当前槽位已被搬运,并不能代表其后的槽位一定被搬运或者尚未被搬运;也就是说其后的若干槽位是一个不可控的状态。

解决办法是引入了类似于方法调用堆栈的机制,在跳转到nextTable时记录下当前table和已经抵达的槽位并进行入栈操作,然后开始遍历下一个table的i和i+n槽位,如果遇到forwadingNode再一次入栈,周而复始循环往复;

每次如果i+n槽位如果到了右半段快要溢出的话就会遵循原来的入栈规则进行出栈,也就是回到上一个上下文节点,最终会回到初始的table也就是initialTable中的节点。

C13Map的Traverser组件

static class Traverser<K,V> {

Node<K,V>[] tab; // current table; updated if resized

Node<K,V> next; // the next entry to use

TableStack<K,V> stack, spare; // to save/restore on ForwardingNodes int index; // index of bin to use next

int baseIndex; // current index of initial table

int baseLimit; // index bound for initial table

final int baseSize; // initial table size

Traverser(Node<K,V>[] tab, int size, int index, int limit) { this.tab = tab; this.baseSize = size; this.baseIndex = this.index = index; this.baseLimit = limit; this.next = null;

} /** * 返回下一个节点 */ final Node<K,V> advance() { Node<K,V> e; if ((e = next) != null)

e = e.next;

for (;

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/391513
推荐阅读
相关标签