当前位置:   article > 正文

并发容器ConcurrentHashMap原理解析及应用_concurrenthashmap并发扩容原理

concurrenthashmap并发扩容原理

前言

本篇文章主要解析juc包下面的并发容器 ConcurrentHashMap,1.7的实现,以及1.8之后做了哪些优化;对比hashmap做的优化,为什么在hashtable的源码注释上推荐使用ConcurrentHashMap对比。

HashMap之实现原理及hash碰撞

HashMap之扩容 数据迁移

ConcurrentHashMap 概述

HashMap 是集合Collection的重要成员,也是Map族最为在工作用到常用的一种,正是因为查找插入等效率高导致它经常被使用,这个原因因此有它的局限性;那就是它不保证多线程情况下,数据的安全性,一旦多线程操作hashMap,就可能出现线程安全问题,例如扩容时死循环问题;

从jdk1.1开始开发时,作者已经想到这个问题,因此有了hashtable这个类去解决问题,他当时觉得反正对于多线程操作map集合我们只要保证他的线程安全并不怎么管效率如何,加了synchronized关键字看起来至少了解决办法,要不然你就对多线程下操作map自己加锁,就是效率太低了。

Java 集合深入理解 (十四) :Hashtable实现原理研究

一直到jdk1.5版本的时候,有了一个质的飞跃,不得不说到一个人,那就是Doug Lea 大神 想到了不使用synchronized关键字,使用synchronized关键字极不易扩展的,而且不能自己控制,并且锁粒度太粗了,本来我多线程过来都是读操作,对数据并不冲突,我可以让他并发进行的,只对写有进行锁起来。对于map来,我们本来map就是链表数组操作,将map进行分组,多个线程分组去查询和写入这是可以并行处理,当然我只说了一部分,例如阻塞 队列,以及park和unpark,然后atomic 这些,都是juc包下面的实现,供给我们不一样的想法,我们可以将粒度降的很低。

继续说说ConcurrentHashMap 这个是jdk1.5  时,juc包提出时,就提出由这个类,当时提出将散列表,进行分组,多线程情况,按数据分组控制多线程。这在我看来已经挺好了,但是发觉在jdk1.8以后又继续优化,确实在不断的进步呀,因为在jdk1.8jvm给jdk提出了Unsafe 自旋操作,也是来源cpu在性能不断增加,以及内存不断增大,多线程在操作数据前做了一步检查,看老数据是否正确,就是这步操作,在ConcurrentHashMap并不需要分组操作,而只需要自旋操作,效率又做了很大的提升。对于jdk中ConcurrentHashMap来说它认为在使用过程中,不可能数据一直冲突把,如果是一直冲突,那我们自己业务代码肯定是需要修改的,来保证数据的准确性的。而且如果大量的多线程操作数据,大量自旋也是很消耗cpu 和内存,可能以后硬件的强度继续提升到非常大时,有可能又会有新的好算法好的思想出现,这篇文章主要也是研究ConcurrentHashMap是什么,也会从源码去分析,对比着hashmap去看,他们结构是非常相似的。

不外乎在原有操作新增了cas操作

  1. static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
  2. Node<K,V> c, Node<K,V> v) {
  3. return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
  4. }

 上图为整个map的继承实现图。ConcurrentHashMap 即实现了ConcurrentMap 提供线程安全性和原子性的{@link java.util.Map}保证。 又继承自AbstractMap 来实现基本map的基本方法 包括put remove size iterator迭代器等基础方法。

ConcurrentHashMap源码分析

jdk1.7中

更新操作之间允许的并发性由可选的concurrencyLevel构造函数参数 默认值16(这是来自于一个segments的长度为16),用作内部大小调整的提示。这个表是内部分区的,以尝试允许指定的无争用的并发更新数。因为安置在哈希表本质上是随机的情况下,实际的并发将变化。理想情况下,您应该选择一个值来容纳尽可能多的对象线程将同时修改表。使用远高于需要的价值可能会浪费空间和时间,并且显著较低的值可能会导致线程争用。但是 在一个数量级内高估和低估通常不会有太多明显的影响。

下面的属性和hashmap中字段是一致的,用来初始化散列表的大小和阈值。

  1. static final int DEFAULT_INITIAL_CAPACITY = 16;
  2. static final float DEFAULT_LOAD_FACTOR = 0.75f;

然后下面独有的默认的并发级别参数设置,这在1.8过后是取消掉的

  1. /**
  2. * 此表的默认并发级别,在构造函数中未另行指定时使用。
  3. */
  4. static final int DEFAULT_CONCURRENCY_LEVEL = 16;
  1. /**
  2. * 用于在段内建立索引的Shift值。
  3. */
  4. final int segmentShift;
  5. /** segments中每个元素都是一个专用的hashtable
  6. * The segments, each of which is a specialized hash table.
  7. */
  8. final Segment<K,V>[] segments;

put方法

  1. public V put(K key, V value) {
  2. Segment<K,V> s;
  3. if (value == null)
  4. throw new NullPointerException();
  5. int hash = hash(key); // 计算Hash值
  6. int j = (hash >>> segmentShift) & segmentMask; //计算下标j
  7. if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
  8. (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
  9. s = ensureSegment(j); //若j处有segment就返回,若没有就创建并返回
  10. return s.put(key, hash, value, false); //将值put到segment中去
  11. }

所有的操作都是segment 封装好的对象进行操作。计算出hash

主要Segment 进行封装起来,调用Segment 的put方法进行添加数据

Segment 从属性上看,就是一个hashtable.

而segment中put方法和hashmap中方法是一致的。

Segment方法看它怎么去保证数据线程安全,既然是juc下面的包,那肯定不会用synchronized,很不容易扩展。这点从继承方法就能看出。

static final class Segment<K, V> extends ReentrantLock implements Serializable {

针对put方法

  1. final V put(K key, int hash, V value, boolean onlyIfAbsent) {
  2. HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); // 如果tryLock成功,就返回null,否则。。。
  3. V oldValue;
  4. try {
  5. HashEntry<K, V>[] tab = table;
  6. int index = (tab.length - 1) & hash; // 根据table数组的长度 和 hash值计算index小标
  7. HashEntry<K, V> first = entryAt(tab, index); // 找到table数组在 index处链表的头部
  8. for (HashEntry<K, V> e = first;;) { //first开始遍历链表
  9. if (e != null) { // 若e!=null
  10. K k;
  11. if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 如果key相同
  12. oldValue = e.value; // 获取旧值
  13. if (!onlyIfAbsent) { // 若absent=false
  14. e.value = value; // 覆盖旧值
  15. ++modCount; //
  16. }
  17. break; // 若已经找到,就退出链表遍历
  18. }
  19. e = e.next; //key不相同,继续遍历
  20. } else { // 直到e为null
  21. if (node != null) // 将元素放到链表头部
  22. node.setNext(first);
  23. else
  24. node = new HashEntry<K, V>(hash, key, value, first); // 创建新的Entry
  25. int c = count + 1; // count 用来记录元素个数
  26. if (c > threshold && tab.length < MAXIMUM_CAPACITY) // 如果hashmap元素个数超过threshold,并且table长度小于最大容量
  27. rehash(node); // rehash跟resize的功能差不多,将table的长度变为原来的两倍,重新打包entries,并将给定的node添加到新的table
  28. else // 如果还有容量
  29. setEntryAt(tab, index, node); // 就在index处添加链表节点
  30. ++modCount; // 修改操作数
  31. count = c; //count+1
  32. oldValue = null; //
  33. break;
  34. }
  35. }
  36. } finally {
  37. unlock(); // 执行完操作后,释放锁
  38. }
  39. return oldValue; // 返回oldValue
  40. }

这里就是普通的HashEntry 。

这里保证数据线程安全的方法;就是下面的方法。 给对象进行加锁

HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);

这里这样做的好处还是在于每个没冲突的数据,保证多线程安全性。 散列数组每一个数据添加锁。

最后的现象,segment有多少个,并发量就有多少个。

jdk1.8

 这里说一点hashmap对于链表转换成红黑树和回退条件。

转换成红黑树,链表长度要达到8,以及数据数达到64进行转换;还是很困难的,因为泊松分布来看,一般更容易是扩容,而不是转换成红黑树

红黑树转链表,链表长度小于6,而发生条件,有两种一是扩容,也有可能删除元素。都有可能导致。

 

 主要利用的casTabAt 方法

  1. if (casTabAt(tab, i, null, //直接用CAS操作,i处的元素
  2. new Node<K,V>(hash, key, value, null)))
  3. break; // no lock when adding to empty bin 想emptybin中假如元素的时候,不需要加锁
  1. static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) {
  2. return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
  3. }

这里对比着hashmap多了自旋操作,多线程下,有数据修改失败的情况

挂载到链表上,锁住头上存的元素, 添加了synchronized,如果链表数组元素为空时,就直接使用cas去修改数据,做自旋操作

  1. final V putVal(K key, V value, boolean onlyIfAbsent) {
  2. if (key == null || value == null) throw new NullPointerException();
  3. int hash = spread(key.hashCode()); //计算hash值
  4. int binCount = 0;
  5. for (Node<K,V>[] tab = table;;) { //自旋
  6. Node<K,V> f; int n, i, fh;
  7. if (tab == null || (n = tab.length) == 0) //table==null || table.length==0
  8. tab = initTable(); //就initTable
  9. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //若下标 i 处的元素为null
  10. if (casTabAt(tab, i, null, //直接用CAS操作,i处的元素
  11. new Node<K,V>(hash, key, value, null)))
  12. break; // no lock when adding to empty bin 想emptybin中假如元素的时候,不需要加锁
  13. }
  14. else if ((fh = f.hash) == MOVED) //若下标 i 处的元素不为null,且f.hash==MOVED MOVED为常量值-1
  15. tab = helpTransfer(tab, f); //
  16. else { //如果是一般的节点
  17. V oldVal = null;
  18. synchronized (f) { //当头部元素不为null,且不需要转换成树时,需要进行同步操作
  19. if (tabAt(tab, i) == f) {
  20. if (fh >= 0) { //若 链表头部hash值 >=0
  21. binCount = 1;
  22. for (Node<K,V> e = f;; ++binCount) {
  23. K ek;
  24. if (e.hash == hash &&
  25. ((ek = e.key) == key ||
  26. (ek != null && key.equals(ek)))) { //如果key相同
  27. oldVal = e.val;
  28. if (!onlyIfAbsent) //且不为absent
  29. e.val = value; //旧值覆盖新值
  30. break;
  31. }
  32. Node<K,V> pred = e;
  33. if ((e = e.next) == null), { //如果链表遍历完成,还没退出,说明没有相同的key存在,在尾部添加节点
  34. pred.next = new Node<K,V>(hash, key,
  35. value, null);
  36. break;
  37. }
  38. }
  39. }
  40. else if (f instanceof TreeBin) { //如果f是Tree的节点
  41. Node<K,V> p;
  42. binCount = 2;
  43. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  44. value)) != null) {
  45. oldVal = p.val;
  46. if (!onlyIfAbsent)
  47. p.val = value;
  48. }
  49. }
  50. }
  51. }
  52. if (binCount != 0) {
  53. if (binCount >= TREEIFY_THRESHOLD)
  54. treeifyBin(tab, i);
  55. if (oldVal != null)
  56. return oldVal;
  57. break;
  58. }
  59. }
  60. }
  61. addCount(1L, binCount);
  62. return null;
  63. }

相对于jdk1.7时,提升了没有hash碰撞时,做了优化使用cas操作。

1.8时,粒度更小了,并发量提高。

总结

整个concurrentHashMap对比各个版本去介绍他怎么达到多线程下数据安全,从hashtable开始到concurrenthashmap,体现出了java作者,对运行效率的优化和提升。在工作中根据业务去选择不同的集合容器,怎么样达到更快的效率,更好的内存。这都是需要研究和讨论的

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号