赞
踩
JDK提供的大部分容器都在java.util.concurrent包中。我先提纲挈领地介绍一下它们,初次露脸,大家只需要知道它们的作用即可。对于具体的实现和注意事项,后面我会一一道来。
除以上并发包中的专有数据结构外,java.util下的Vector是线程安全的(虽然性能和上述专用工具没得比),Collections工具类可以帮助我们将任意集合包装成线程安全的集合。
如果我们需要一个线程安全的HashMap,那么应该怎么做呢?一种可行的方法是使用Collections.synchronizedMap()方法包装我们的HashMap。如下代码产生的HashMap就是线程安全的:
public static Map map = Collections.synchronizedMap(new HashMap<>());
Collections.synchronizedMap()方法会生成一个名为SynchronizedMap的Map,它使用委托将所有与Map相关的功能交给HashMap实现,而自己则主要负责保证线程安全。
首先在SynchronizedMap内包装一个Map:
通过mutex实现对这个m的互斥操作。比如,get()方法的实现如下:
其他所有相关的Map操作都会使用这个mutex进行同步,从而实现线程安全。
一个更加专业的并发HashMap是ConcurrentHashMap,它位于java.util.concurrent包内,专门为并发进行了性能优化,更适合多线程的场合。
ConcurrentHashMap可以说是目前使用最多的并发数据结构之一,作为如此核心的基本组件,不仅要满足功能的需求,更要满足性能的需求,而实现一个高性能的线程安全的HashMap也绝非易事。下面我们就来看一看ConcurrentHashMap的内部实现,来体会一下它的精妙之处吧!
ConcurrentHashMap的实现从JDK8开始有了重大的变化,从JDK8到JDK14,并无太多不同,因此本节以JDK8为范本进行介绍。
从静态数据结构上来说,ConcurrentHashMap包含以下核心元素:
ConcurrentHashMap的核心元素中最值得注意的是Node,Node并非想象中那么简单。下图展示了ConcurrentHashMap的内部数据结构:
可以看到,Map中的Node并非简单的Node对象,实际上,它有可能是Node对象,也有可能是一个TreeBin或者ForwardingNode。
那么它什么时候是Node,什么时候是TreeBin,什么时候又是ForwardingNode呢?
其实在绝大部分场景下,使用的依然是Node。从Node的数组结构不难看出,Node其实是链表,也就是说,一个正常的Map大概率如下图所示:
可以看到,Node数组中的每一个元素实际上是链表的头部,这样,当元素的位置发生冲突时,不同的元素就可以存放在Node数组的同一个槽位中。
当数组槽位对应的是链表时,在链表中查找key只能使用简单的遍历,这在数据不多时,还是可以接受的,而当冲突数据比较多时,这种简单的遍历就有点慢了。因此,在具体实现中,当链表的长度大于且等于8时,会将链表树状化,也就是变成一棵红黑树。如下图所示:
其中一个槽位就变成了一棵树,这就是TreeBin(在TreeBin中使用TreeNode构造整棵树)。
当数组快满时,即**超过75%**的容量时,数组还需要进行扩容。在扩容过程中,如果老的数组已经完成了复制,那么就会将老的数组中的元素使用ForwardingNode对象替代,表示当前槽位的数据已经处理了,不需要再处理了。这样当有多个线程同时参与扩容时,就不会发生冲突。
现在来看一下HashMap最重要的方法——put()方法。
它负责将给定的key和value存入HashMap,它的工作主要有以下几个步骤:
下面根据以上主要步骤,来依次详细说明。
如果没有初始化数组,则尝试初始化数组。
初始化数组会生成一个Node数组:
默认情况下,n为16。同时设置sizeCtl为n-(n>>>2),这意味着sizeCtl为n的75%,表示Map的大小,也就是说ConcurrentHashMap的负载因子是0.75。(为了避免冲突,Map的容量是数组的75%,超过这个阈值就会扩容。)
如果当前正在扩容,则参与帮助扩容。
如果一个节点的hash是MOVE,则表示这是一个ForwardingNode,也就是当前正在扩容。为了尽快完成扩容,当前线程就会参与扩容的工作,而不是等待扩容操作完成。如此紧凑细致的操作,恰恰是ConcurrentHashMap高性能的原因。代码中的(fh=f.hash)==MOVE语义上等同于f instanceof ForwardingNode,但是使用整数相等的判断的效率要远远高于instanceof,所以这里也是一处对性能的优化。
将给定的key和value放入对应的槽位。
在大部分情况下,应该会走到这一步,也就是将key和value放入数组中,会使用如下操作:
可以看到,这里使用synchronized关键字锁住了Node对象。由于在绝大部分情况下,不同线程会操作不同的Node,因此这里的竞争应该不会太大。随着数组规模越来越大,竞争出现的概率会越来越小,所以ConcurrentHashMap就有了极好的并行性。
统计元素总数。
为了有一个高性能的size()方法,ConcurrentHashMap在CounterCell数组中使用了单独的方法来统计元素总数:
CounterCell使用了伪共享优化,具有很高的读写性能。counterCells中所有成员的value相加,就是整个Map的大小。这里使用数组,也是为了防止冲突。如果简单地使用一个变量,那么多线程累加计数时,难免有竞争,现在分散到一个数组中,这种竞争出现的概率就小了很多,对并发就更加友好了。
累加的主要逻辑如下:
触发扩容操作。
最后,ConcurrentHashMap还会检查是否需要扩容,它会检查当前Map的大小是否超过了阈值,如果超过了,则进行扩容。
ConcurrentHashMap的扩容过程非常巧妙,它并没有完全打乱当前已有的元素位置,而是在数组扩容2倍后,将一半的元素移到新的空间中。
所有的元素根据高位是否为1分为low节点和high节点:
接着,重新调整这些元素的位置:
下图显示了从8扩充到16时可能的扩容情况,注意,新位置总是在老位置的后面n个槽位(n为原数组大小):
这样做的好处是,每个元素的位置不需要重新计算,在进行查找时,由于总是会对n-1(一定是一个类似于1111 11111111111这样的二进制数)按位与,因此high节点自然就会出现在+n的位置上。
与put()方法相比,get()方法就比较简单了。它的工作步骤如下:
队列、链表之类的数据结构也是极其常用的,几乎所有的应用程序都与之相关。在Java中,ArrayList和Vector都使用数组作为其内部实现。两者最大的不同在于Vector是线程安全的,而ArrayList不是。此外,LinkedList使用链表的数据结构实现了List。但是很不幸,LinkedList并不是线程安全的,不过参考前面对HashMap的包装,我们也可以使用Collections.synchronizedList()方法来包装任意List:
public static List<String> list = Collections.synchronizedList(new LinkedList<>());
此时生成的List就是线程安全的了。
队列(Queue)也是常用的数据结构之一。在JDK中提供了一个ConcurrentLinkedQueue类用来实现高并发队列。从名字可以看出,这个队列使用链表作为其数据结构。有关ConcurrentLinkedQueue类的性能测试大家可以自行尝试,这里限于篇幅就不再给出性能测试的代码了。大家只需要知道ConcurrentLinkedQueue类应该算高并发环境中性能最好的队列。它之所以有很好的性能,是因为内部复杂的实现。
在这里,我更愿意花一些篇幅来简单介绍一下ConcurrentLinkedQueue类的具体实现细节。作为一个链表,自然需要定义链表内的节点,在ConcurrentLinkedQueue类中,核心定义如下:
其中item是用来表示目标元素的。比如,当列表中存放String时,item就是String类型。字段next表示当前Node的下一个元素,这样每个Node就能环环相扣,串在一起了。下图显示了ConcurrentLinkedQueue类的基本结构:
对Node进行操作时,使用了CAS:
casItem()方法用于设置当前Node的item值。它需要两个参数,第一个参数为期望值,第二个参数为目标值。当当前值等于期望值cmp时,就会将目标值设置为val。同样,casNext()方法也是类似的,但是它用于设置next字段,而不是item字段。
ConcurrentLinkedQueue类内部有两个重要的字段——head和tail,分别表示链表的头部和尾部,它们都是Node类型的。对于head来说,它永远不会为null,并且通过head及succ()后继方法一定能完整地遍历整个链表。对于tail来说,它自然应该表示队列的尾部。
ConcurrentLinkedQueue类的内部实现非常复杂,它允许在运行时,链表处于多个不同的状态。以tail为例,一般来说,我们期望tail总是为链表的尾部,但实际上,tail的更新并不是及时的,可能会产生拖延现象。下图显示了插入节点时tail的更新情况:
可以看到tail的更新会产生滞后,并且每次更新会跳跃两个元素。
下面是ConcurrentLinkedQueue类中向队列中添加元素的offer()方法(不同版本JDK的代码可能存在差异,但算法思想是一致的):
值得注意的是,这个方法没有任何锁操作。线程安全完全由CAS和队列的算法来保证。整个方法的核心是for循环,这个循环没有出口,直到尝试成功,这也符合CAS的流程。当第一次加入元素时,由于队列为空,因此p.next为null。程序进入第8行,将p的next节点赋值为newNode,也就是将新的元素加入队列中。此时p==t成立,因此不会执行第12行的代码更新tail。如果casNext()方法执行成功,则直接返回,如果失败,则再进行一次循环尝试,直到成功。因此,增加一个元素后,tail并不会被更新。
当程序试图增加第二个元素时,由于t还在head的位置上,因此p.next指向实际的第一个元素,因此第6行的q!=null表示q不是最后一个节点。由于向队列中添加元素需要使用最后一个节点的位置,因此循环开始查找最后一个节点。于是,程序会进入第23行,获得最后一个节点。此时,p实际上指向链表中的第一个元素,而它的next为null,故在第二次循环时,进入第8行。p更新自己的next,让它指向新加入的节点。由于此时p!=t成立,则更新t所在位置,将t移动到链表最后。
第17行处理了p==q的情况。这种情况是由于遇到了哨兵(sentinel)节点导致的。所谓哨兵节点,就是next指向自己的节点。这种节点在队列中的存在价值不大,主要表示要删除的节点或者空节点。当遇到哨兵节点时,由于无法通过next取得后续的节点,因此很可能直接返回head,期望通过从链表头部开始遍历,进一步找到链表末尾。一旦在执行过程中出现tail被其他线程修改的情况,则进行一次“打赌”,使用新的tail作为链表末尾(这样就避免了重新查找tail的开销)。
如果大家对Java不是特别熟悉,则可能会对下面的代码产生疑惑(第20行):
虽然只有短短一行,但是包含的信息比较多。首先“!=”并不是原子操作,它是可以被中断的。也就是说,在执行“!=”时,程序会先取得t的值,再执行t=tail,并取得新的t的值,然后比较这两个值是否相等。在单线程时,t!=t这种语句显然不成立。但是在并发环境中,有可能在获得左边的t值后,右边的t值被其他线程修改,这样t!=t就可能成立了,这里就是这种情况。如果在比较过程中,tail被其他线程修改,当它再次赋值给t时,就会导致等式左边的t和右边的t不同。如果两个t不相同,表示tail在中途被其他线程篡改了。这时,我们就可以用新的tail作为链表尾部,也就是这里等式右边的t。但如果tail没有被修改,则返回head,要求从头部开始重新查找尾部。
为了简化问题,我们考察t!=t的字节码(注意这里假设t为静态整型变量):
可以看到,在字节码层面,t被先后取了两次值,在多线程环境下,我们自然无法保证两次对t的取值是相同的,下图显示了这种情况:
下面我们来看一下哨兵节点是如何产生的:
上述代码第3行弹出队列内的元素,其执行过程如下:
由于队列中只有一个元素,根据前文的描述,此时tail并没有更新,而是指向和head相同的位置。而head本身的item为null,其next为列表第一个元素。故在第一轮循环中,代码直接进入第18行,将p赋值为q,而q就是p.next,也是当前列表中的第一个元素。接着,在第二轮循环中,p.item显然不为null(为字符串1),因此代码应该可以顺利进入第7行(如果CAS操作成功)。进入第7行,也意味着p的item被设置为null(因为这是弹出元素,自然需要删除)。此时p和h是不相等的(因为p已经指向原有的第一个元素了),故执行了第8行的updateHead()方法,其实现如下:
可以看到,在updateHead()方法中将p作为新的链表头部(通过casHead()方法实现),而原有的head就被设置为哨兵(通过lazySetNext()方法实现)。
这样一个哨兵节点就产生了,由于此时原有的head和tail实际上是同一个元素,因此再次用offer()方法插入元素时,就会遇到这个tail,也就是哨兵。这就是offer()方法的代码中第17行判断的意义。
通过以上说明,大家应该可以明显感觉到,不使用锁而单纯地使用CAS会要求在应用层面保证线程安全,并处理一些可能存在的不一致问题,大大增加了程序设计和实现的难度。它带来的好处就是可以使性能飞速提升,在有些场合也是值得的。
在很多应用场景中,读操作可能会远远多于写操作。比如,有些系统级别的信息,往往只需要加载或者修改很少的次数,但是会被系统内所有模块频繁访问。对于这种场景,我们最希望的就是读操作可以尽可能地快,而写操作即使慢一些也没有太大关系。
由于读操作根本不会修改原有的数据,因此对于每次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问List的内部数据,毕竟读操作是安全的。根据读写锁的思想,读锁和读锁之间确实也不冲突。但是,读操作会受到写操作的阻碍,当写操作发生时,读操作就必须等待,否则可能读到不一致的数据。同理,当读操作正在进行时,程序也不能进行写操作。
为了将读操作的性能做到极致,JDK中提供了CopyOnWriteArrayList类。对它来说,读操作是完全不用加锁的,并且更好的消息是:写操作也不会阻塞读操作。只有写操作和写操作之间需要进行同步等待。这样,读操作的性能就会大幅度提升。它是怎么做的呢?
从这个类的名字我们可以看到,所谓CopyOnWrite就是在写入时,进行一次自我复制。换句话说,当这个List需要修改时,并不修改原有的内容(这对于保证当前读线程的数据一致性非常重要),而对原有的数据进行一次复制,将修改的内容写入副本中。写完之后,再用修改完的副本替换原来的数据,这样就可以保证写操作不影响读操作。
下面的代码展示了读操作的实现:
需要注意的是,读操作的代码中没有任何同步控制和锁操作,理由就是内部数组array不会发生修改,只会被另外一个array替换,因此可以保证数据安全。
和简单的读操作相比,写操作就有些麻烦了,其实现如下:
写操作使用了锁,当然这个锁仅限于控制写操作与写操作阻塞的情况。重点在于第7行代码进行了内部元素的完整复制,会生成一个新的数组newElements,将新的元素加入newElements,然后在第9行使用新的数组替换老的数组,修改就完成了。整个过程不会影响读操作,并且修改完,读线程可以立即“察觉”到这个修改(因为array变量的类型是volatile)。
前面我们已经提到了ConcurrentLinkedQueue类是高性能的队列。对于并发程序而言,高性能自然是一个需要追求的目标,但多线程的开发模式还会引入一个问题,那就是如何进行多个线程间的数据共享呢?比如,线程A希望给线程B发一条消息,用什么方式告知线程B是比较合理的呢?
一般来说,我们总是希望整个系统是松散耦合的。比如,你所在小区的物业希望可以得到一些业主的意见,设立了一个意见箱,业主对物业有任何要求或者意见都可以将其投递到意见箱里。作为业主的你并不需要直接找到物业相关的工作人员就能表达意见。实际上,物业的工作人员也可能经常发生变动,直接找工作人员未必是一件方便的事情。而不管是否发生了人员的变动,你投递到意见箱的意见总是会被物业的工作人员看到。这样你就可以很容易地表达自己的诉求了。你既不需要直接和他们对话,又可以轻松地提出自己的建议(这里假定物业公司的员工都是尽心尽责的好员工)。
将这个模式映射到程序中,也就是说我们既希望线程A能够通知线程B,又希望线程A不知道线程B的存在。这样,如果将来进行重构或者升级,我们完全可以不修改线程A,而直接把线程B升级为线程C,保证系统的平滑过渡。这中间的“意见箱”就可以使用BlockingQueue来实现。
与之前提到的ConcurrentLinkedQueue类或者CopyOnWriteArrayList类不同,BlockingQueue是一个接口,并非一个具体的实现类。它的主要实现如下图所示:
这里我们主要介绍ArrayBlockingQueue类和LinkedBlockingQueue类。从名字应该可以知道,ArrayBlockingQueue类是基于数组实现的,而LinkedBlockingQueue类是基于链表实现的。也正因为如此,ArrayBlockingQueue类更适合做有界队列,因为队列中可容纳的元素个数需要在队列创建时指定(毕竟数组的动态扩展不太方便)。而LinkedBlockingQueue类适合作为无界队列,或者那些边界值非常大的队列,因为其内部元素数量可以动态增加,它不会因为初始容量很大,而占据一大半的内存。
BlockingQueue之所以适合作为数据共享的通道,关键还在Blocking上。Blocking是阻塞的意思,当服务线程(服务线程指不断获取队列中的消息进而进行处理的线程)处理完成队列中所有的消息后,它如何知道下一条消息何时到来呢?
最简单的做法是让这个线程按照一定的时间间隔不停地循环和监控队列。这是一种可行的方案,但显然造成了不必要的资源浪费,而且循环周期也难以确定。BlockingQueue很好地解决了这个问题,它会让服务线程在队列为空时进行等待,当有新的消息进入队列后,自动将线程唤醒,如下图所示:
那么它是如何实现的呢?我们以ArrayBlockingQueue类为例,来一探究竟。
ArrayBlockingQueue类的内部元素都放置在一个对象数组中:
final Object[] items;
向队列中压入元素可以使用offer()方法和put()方法。对于offer()方法,如果当前队列已经满了,它就会立即返回false。如果没有满,则执行正常的入队操作。所以,我们不讨论这个方法。我们需要关注的是put()方法,put()方法也会将元素压入队列末尾,但如果队列满了,它会一直等待,直到队列中有空闲的位置。
从队列中弹出元素可以使用poll()方法和take()方法。它们都从队列的头部获得一个元素。不同之处在于:如果队列为空,poll()方法会直接返回null,而take()方法会等待,直到队列中有可用元素。
因此,put()方法和take()方法才是体现Blocking的关键。为了做好等待和通知这两件事,在ArrayBlockingQueue类内部定义了以下一些字段:
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
当执行take()方法时,如果队列为空,则让当前线程在notEmpty上等待。新元素入队时,则进行一次notEmpty上的通知。
下面的代码显示了take()方法的执行过程:
第6行代码就要求当前线程进行等待。当队列中有新元素时,线程会得到一个通知。下面是元素入队时的一段代码:
注意第5行,当新元素进入队列后,需要通知等待在notEmpty上的线程,让它们继续工作。
同理,对于put()方法也是一样的,当队列满时,需要让入队的线程等待,如下面第7行代码所示:
当有元素从队列中被挪走,队列中出现空位时,自然也需要通知等待入队的线程:
当队列中有空闲位置时,第7行代码通知等待入队的线程。
从实现上说,ArrayBlockingQueue类在物理上是一个数组,但在逻辑层面是一个环形结构。由于其数组的特性,其容量大小在初始化时就已经指定,并且无法动态调整。当有元素加入或者离开时,ArrayBlockingQueue类总是使用takeIndex和putIndex两个变量分别表示队列头部和尾部元素在数组中的位置。每一次入队和出队操作都会调整这两个重要的索引位置。下面的代码显示了对这两个索引的循环调整策略:
不难看出,上面两个函数将数组的头尾相接,从而实现了环形数组。
在JDK的并发包中,除常用的哈希表外,还实现了一种有趣的数据结构——跳表。跳表是一种可以用来快速查找数据的数据结构,有点类似于平衡树,它们的一个重要区别是:对平衡树的插入和删除往往很可能导致平衡树进行一次全局的调整,而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。这样带来的好处是:在高并发的情况下,你需要一个全局锁来保证平衡树的线程安全,而对于跳表,你只需要部分锁即可,这样在高并发环境下你就可以拥有更好的性能。就查询的性能而言,因为跳表的时间复杂度是O(log n),所以在并发数据结构中,JDK使用跳表来实现Map。
跳表的另外一个特点是随机算法,跳表的本质是同时维护多个链表,并且链表是分层的。下图是跳表结构示意图:
底层的链表维护了跳表内所有的元素,每上面一层链表都是下面一层链表的子集,一个元素插入哪些层是完全随机的。因此,如果运气不好,你可能会得到一个性能很糟糕的结构。但是在实际工作中,它的表现是非常好的。
跳表内的所有链表的元素都是排序好的。查找时,可以从顶层链表开始找。一旦发现被查找的元素大于当前链表中的取值,就会转入下一层链表继续找。这也就是说,在查找过程中,搜索是跳跃式的。例如,在跳表中查找元素7,如下图所示:
很显然,跳表是一种使用空间换时间的算法。
使用跳表实现Map和使用哈希算法实现Map的另一个不同之处是:哈希算法并不会保存元素的顺序,而跳表内所有的元素都是有序的。因此在对跳表进行遍历时,你会得到一个有序的结果。如果你的应用需要有序性,那么跳表就是你的最佳选择。
实现这一数据结构的类是ConcurrentSkipListMap。下面展示了跳表的简单使用方法:
跳表的内部由几个关键的数据结构组成。首先是Node,一个Node表示一个节点,里面含有key和value(就是Map的key和value)两个重要的元素。每个Node还会指向下一个Node,因此还有一个元素next:
对Node的所有操作,使用CAS,以JDK14为例,VarHandle对Node各个成员使用CAS读写(不同JDK版本中,实现CAS的细节有所不同,大家只需要知道这是一个CAS即可):
另外一个重要的数据结构是Index,顾名思义,它表示索引,内部包装了Node,同时增加了向下的引用和向右的引用:
整个跳表就是根据Index进行全网的组织的。
此外,对于每一层的表头还需要记录当前处于哪一层。为此,还需要一个名为HeadIndex的数据结构,表示链表头部的第一个Index,它继承自Index:
这样核心的内部元素就介绍完了。对于跳表的所有操作,就是组织好这些Index之间的连接关系。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。