ConcurrentHashMap

1. HashMap,HashTable,ConcurrentHashMap

线程不安全的HashMap

在多线程环境下(jdk1.8之前),使用HashMap进行put操作可能会引起死循环,导致CPU利用率接近100%,原因是多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry

效率低下的HashTable

HashTable容器使用synchronized来保证线程安全,也就是说,锁住整个数据结构来保证线程的安全性,当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态;

例如:线程1使用put方法进行元素添加,线程2不但不能使用put添加,也不能使用get方法获取,所以竞争越激烈,效率越低

ConcurrentHashMap的锁分段技术可有效提升并发访问效率

HashTable之所以在竞争激烈的并发环境下效率低下,是因为所有访问HashTable的线程都必须竞争同一把锁

对于ConcurrentHashMap,容器内有多把锁,每一把锁都用于容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,

从而可以有效的提高并发访问效率,总而言之,首先将数据分为一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段的数据的时候,其他段的数据也能被其他线程访问

2. ConcurrentHashMap的结构

image-20210116174321391

  • ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成
  • Segment是一种可重入锁,在ConcurrentHashMap里扮演可重入锁的角色
  • HashEntry用于存储键值对数据
  • 一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构,一个Segment包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素
  • 当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁

ConcurrentHashMap的初始化

segment构造方法

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;//负载因子
            this.threshold = threshold;//阈值
            this.table = tab;//主干数组即HashEntry数组
        }

ConcurrentHashMap构造方法

public ConcurrentHashMap(int initialCapacity,
                               float loadFactor, int concurrencyLevel) {
          if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
              throw new IllegalArgumentException();
          //MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536
          if (concurrencyLevel > MAX_SEGMENTS)
              concurrencyLevel = MAX_SEGMENTS;
          //2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5
         int sshift = 0;
         //ssize 为segments数组长度,根据concurrentLevel计算得出
         int ssize = 1;
         while (ssize < concurrencyLevel) {
             ++sshift;
             ssize <<= 1;
         }
         //segmentShift和segmentMask这两个变量在定位segment时会用到,后面会详细讲
         this.segmentShift = 32 - sshift;
         this.segmentMask = ssize - 1;
         if (initialCapacity > MAXIMUM_CAPACITY)
             initialCapacity = MAXIMUM_CAPACITY;
         //计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.
         int c = initialCapacity / ssize;
         if (c * ssize < initialCapacity)
             ++c;
         int cap = MIN_SEGMENT_TABLE_CAPACITY;
         while (cap < c)
             cap <<= 1;
         //创建segments数组并初始化第一个Segment,其余的Segment延迟初始化
         Segment<K,V> s0 =
             new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                              (HashEntry<K,V>[])new HashEntry[cap]);
         Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
         UNSAFE.putOrderedObject(ss, SBASE, s0);
         this.segments = ss;
     }

  初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity为16,loadFactor为0.75(负载因子,扩容时需要参考),concurrentLevel为16。

  • Segment数组大小ssize是由ConcurrentLevel决定的,ssize一定是大于等于concurrentLevel的最小的2的次幂,原因可参考hashmap实现原理剖析

3. ConcurrentHashMap的操作

put操作

public V put(K key, V value) {
        Segment<K,V> s;
        //concurrentHashMap不允许key/value为空
        if (value == null)
            throw new NullPointerException();
        //hash函数对key的hashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀
        int hash = hash(key);
        //返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segment
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }



final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);//tryLock不成功时会遍历定位到的HashEnry位置的链表(遍历主要是为了使CPU缓存链表),若找不到,则创建HashEntry。tryLock一定次数后(MAX_SCAN_RETRIES变量决定),则lock。若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,这个hash值在定位Segment时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位。
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;              //若c超出阈值threshold,需要扩容并rehash。扩容后的容量是当前容量的2倍。这样可以最大程度避免之前散列好的entry重新散列,具体在另一篇文章中有详细分析,不赘述。扩容并rehash的这个过程是比较消耗资源的。
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
  • put操作需要对共享变量进行写入操作,为了保证线程安全,必须加锁,这里采用的是synchronized隐式锁
  • 对于整个puy方法流程:put方法首先定位到segment,然后调用segment里的put方法进行插入操作
  • 插入操作需要两个步骤:首先判断是否需要对segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,放入HashEntry数组里
  • 是否需要扩容:在插入元素前判断Segment里的HashEntry数组是否超过容量(threshold),如果超过则扩容,这点相对于HashMap更为恰当,因为HashMap是在插入元素后在判断是否已经到达容量,如果到达进行扩容,这样可能会导致一次无意义的扩容
  • 如何扩容:首先创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里,为了高效,ConcurrentHashMap不会对整个容器进行扩容,而是只对某个segment进行扩容

get操作

public V get(Object key) {
        Segment<K,V> s; 
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;        //先定位Segment,再定位HashEntry
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
  • get的实现较为高效简洁,先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,在通过散列算法定位到元素
  • 整个get过程不需要加锁,除非读到的值是空值才会加锁重读,因为get方法内使用的所有共享变量都被定义为volatile,在线程间保证可见性,能够被多线程同时读取,并且保证不会读到过期的值(happen before)

size操作

如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和

  • 即使Segment里的全局变量count是volatile类型的,但是也不能直接将所有count值相加,然后累计返回,因为累加前使用的count发生了变化,那么统计结果就不会正确
  • 最安全的做法就是在统计size的时候将所有Segment的put,remove,clean方法全部锁住,但是十分低效

由于在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以我们采用以下算法

  • 先尝试2次通过不锁住Segment的方式来统计各个Segment的大小,如果统计过程中count发生变化,那么再采用加锁的方式统计所有Segment的大小
  • ConcurrentHashMap如何判断统计的过程中容器是否发生变化呢?
    • 使用modcount变量,在put,remove,clean方法中,操作元素前都会将变量modcount+1,那么只需要在统计size前后比较modcount的量是否发生变化即可得知容器大小是否发生变化

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

线程池 Previous
AQS Next