在一个程序员的成长过程就一定要阅读源码,并且了解其中的原理,只有这样才可以深入了解其中的功能,就像ConCurrentHashMap 是线程安全的,到底是如何安全的?以及如何正确使用它?rehash在什么情况?红黑树存储原理?不了解其中源码原理是不行的。所以今天就写一篇Java源码的,关于ConCurrentHashMap的源码,大部分来自于网上摘抄,有的需要自己测试验证,现作为随笔记录于此,便于记录整理 ,并且分享给大家共同进步。
哈希表是非常高效的数据结构,在JAVA开发中,最常见最频繁使用的就是HashMap和HashTable,但是在线程的使用范围中就显得乏力了,简单来说,HashMap是线程不安全的,在并发中环境中,可能会形成环状链表,导致get操作时候,cpu空转,死循环。HashTable虽然和HashMap原理几乎一样,差别只有2个;第一是,不允许key和value为null;第二是,他是线程安全的。但是HashTable线程安全的策略简单粗暴,所有相关操作都是synchronized的,相当于给整个hash表加了一把大锁,在多线程访问的时候,只要有一个线程访问或者操作这个对象,那其他线程只能阻塞,相当于将所有操作串行化,牺牲了很大的性能。所以就需要性能好又安全的哈希表结构,所以就有了本文的主角ConCurrentMap。现在我们逐步说明不同版本的原理。
ConCurrentHashMap采用了锁分段技术:加入容器中有多把锁,没一把锁用于锁容器中一部分数据,那么当线程访问容器中里不同的数据段的数据时,线程间就不会存在锁竞争,从而就可以提高并发访问效率,简而言之,就是将数据一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也可能被其他线程访问。
ConCurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,并且在代码中HashEntry和Segment是两个静态内部类,数据结构如下图所示:
Segment数组的意义就是将一个大的table分割成多个小的table 来进行加锁,也就是上面提到的锁分离技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。HashEntry用来封装映射表的键/值对;Segment充当锁的角色,每个Segment对象守护散列表的若干个的桶。每个桶是由若干个HashEntry对象连接起来的链表。
ConCurrentHashMap不允许key为null,ConCurrentHashMap不允许空值的主要原因是,在非并发映射中几乎不能容忍的模糊性是无法容纳的,主要一点是如果map.get(key)返回null,则无法检测key是否显示映射为null或未映射,在非并发中你可以使用map.contains(key)进行检查,但在并发映射中,映射可能在调用之间发生变化。
继承关系:ConcurrentHashMap继承了AbstractMap类,同时实现了ConcurrentMap接口。
HashEntry 用来封装散列映射表中的键值对,可以理解为一个桶。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。
1 /**
2 * ConcurrentHashMap list entry. Note that this is never exported
3 * out as a user-visible Map.Entry.
4 */
5 static final class HashEntry
6 final int hash;
7 final K key;
8 volatile V value;
9 volatile HashEntry
10
11 HashEntry(int hash, K key, V value, HashEntry
12 this.hash = hash;
13 this.key = key;
14 this.value = value;
15 this.next = next;
16 }
17
18 39 }
在 ConcurrentHashMap 中,在散列时如果产生“碰撞”,将采用“链地址法”来处理“碰撞”:把“碰撞”的 HashEntry 对象链接成一个链表。由于 HashEntry 的 next 域为 final 型,所以新节点只能在链表的表头处插入。下图是在一个空桶中依次插入 A,B,C 三个 HashEntry 对象后的结构图:
Segment 类继承于 ReentrantLock 类,从而使得 Segment 对象能充当锁的角色。每个 Segment 对象用来守护其(成员对象 table 中)包含的若干个桶。
table 是一个由 HashEntry 对象组成的数组。table 数组的每一个数组成员就是散列映射表的一个桶。 count 变量是一个计数器,它表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 组成的链表)包含的 HashEntry 对象的个数。每一个 Segment 对象都有一个 count 对象来表示本 Segment 中包含的 HashEntry 对象的总数。
5 static final class Segment
下图是依次插入 ABC 三个 HashEntry 节点后,Segment 的结构示意图:
构造方法如下:
1 /**
2 * Creates a new, empty map with the specified initial
3 * capacity, load factor and concurrency level.
4 *
5 * @param initialCapacity the initial capacity. The implementation
6 * performs internal sizing to accommodate this many elements.
7 * @param loadFactor the load factor threshold, used to control resizing.
8 * Resizing may be performed when the average number of elements per
9 * bin exceeds this threshold.
10 * @param concurrencyLevel the estimated number of concurrently
11 * updating threads. The implementation performs internal sizing
12 * to try to accommodate this many threads.
13 * @throws IllegalArgumentException if the initial capacity is
14 * negative or the load factor or concurrencyLevel are
15 * nonpositive.
16 */
17 @SuppressWarnings("unchecked")
18 public ConcurrentHashMap(int initialCapacity,
19 float loadFactor, int concurrencyLevel) {
20 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
21 throw new IllegalArgumentException();
22 if (concurrencyLevel > MAX_SEGMENTS)
23 concurrencyLevel = MAX_SEGMENTS;
24 // Find power-of-two sizes best matching arguments
25 int sshift = 0;
26 int ssize = 1;
27 while (ssize < concurrencyLevel) {
28 ++sshift;
29 ssize <<= 1;
30 }
31 this.segmentShift = 32 - sshift;
32 this.segmentMask = ssize - 1;
33 if (initialCapacity > MAXIMUM_CAPACITY)
34 initialCapacity = MAXIMUM_CAPACITY;
35 int c = initialCapacity / ssize;
36 if (c * ssize < initialCapacity)
37 ++c;
38 int cap = MIN_SEGMENT_TABLE_CAPACITY;
39 while (cap < c)
40 cap <<= 1;
41 // create segments and segments[0]
42 Segment
43 new Segment
44 (HashEntry
45 Segment
46 UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
47 this.segments = ss;
48 }
这个构造函数有三个入参,如果用户不指定则会使用默认值,initialCapacity为16,loadFactor为0.75(负载因子,扩容时需要参考),concurrentLevel为16。现在我对初始化进行debug测试见下图:
从上图可以知道当走到第31行代码时 31 this.segmentShift = 32 - sshift; 参数变量的值为图上所示,初始化大小 initialCapacity为16,负载 loadFactor为0.75(负载因子,扩容需要参考),并行度concurrencyLevel为16,sshit为4:2的sshit次方等于ssize,ssize为16:这是segment数组的长度,根据concurrencyLevel算出,代码可知ssize一定是大于或者等于concurrencyLevel最小的2次幂,为什么Segment大小一定是2次幂,那是因为通过位与散列算法来定位Segment的index,通过上面的结果计算出segmentShift和segmentMask,即segmentShit是28,为什么32减去4呢,因为hash()方法的输出最大位数就是32位数,segmentMask是散列运算的掩码,这里值是15,
关于segmentShit和segmentMask,
这两个全局变量主要作用就是定位segment,int j =(hash >>> segmentShift) & segmentMask。
segmentMask:段掩码,假如segments数组长度为16,则段掩码为16-1=15;segments长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性
segmentShift:2的sshift次方等于ssize,segmentShift=32-sshift。若segments长度为16,segmentShift=32-4=28;若segments长度为32,segmentShift=32-5=27。而计算得出的hash值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。
第一步debug就是确定Segment的数组长度为16,并且初始化定位segment的变量。现在进行第二步debug
当运行到42行代码时候,由上图所示确认了经过计算得出HashEntry的数组初始化长度为2,进行第三部debug
这里初始化了Segment,创建segments数组并初始化第一个Segment,其余的Segment延迟初始化,并且添加第0个元素
在源码中我们可以看出在构造ConCurrentHashMap中主要做3件事情。
1,确认并发度,也就是Segment的数组长度,并保证他是2的n次幂。
2,确认HashEntry数组的初始化长度,并保证他是2的n次幂。
3,将Segment初始化,并只是填充第0个元素。
第一步主要是验证传参以及定位segment的index,主要流程是先获取key的hash值,定位segment,调用segment的put方法。这里就需要上文初始化得到的segmentShit和segmentMask的值就是下图的22行代码。
1 /**
2 * Maps the specified key to the specified value in this table.
3 * Neither the key nor the value can be null.
4 *
5 *
The value can be retrieved by calling the get method
6 * with a key that is equal to the original key.
7 *
8 * @param key key with which the specified value is to be associated
9 * @param value value to be associated with the specified key
10 * @return the previous value associated with key, or
11 * null if there was no mapping for key
12 * @throws NullPointerException if the specified key or value is null
13 */
14 @SuppressWarnings("unchecked")
15 public V put(K key, V value) {
16 Segment
17 if (value == null)
18 throw new NullPointerException();
19 int hash = hash(key);
20 int j = (hash >>> segmentShift) & segmentMask;
21 if ((s = (Segment
22 (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
23 s = ensureSegment(j);
24 return s.put(key, hash, value, false);
25 }
hash方法源码为:也就是上图的19行代码:总的来说就是先拿到key的hashCode,然后对这个值进行再散列。
1 private int hash(Object k) {
2 int h = hashSeed;
3
4 if ((0 != h) && (k instanceof String)) {
5 return sun.misc.Hashing.stringHash32((String) k);
6 }
7
8 h ^= k.hashCode();
9
10 // Spread bits to regularize both segment and index locations,
11 // using variant of single-word Wang/Jenkins hash.
12 h += (h << 15) ^ 0xffffcd7d;
13 h ^= (h >>> 10);
14 h += (h << 3);
15 h ^= (h >>> 6);
16 h += (h << 2) + (h << 14);
17 return h ^ (h >>> 16);
18 }
返回第一张图put方法的23行的代码方法源码为:这里主要调用segment的put方法,
1 final V put(K key, int hash, V value, boolean onlyIfAbsent) {
2 HashEntry
3 scanAndLockForPut(key, hash, value);
4 V oldValue;
5 try {
6 HashEntry
7 int index = (tab.length - 1) & hash;
8 HashEntry
9 for (HashEntry
10 if (e != null) {
11 K k;
12 if ((k = e.key) == key ||
13 (e.hash == hash && key.equals(k))) {
14 oldValue = e.value;
15 if (!onlyIfAbsent) {
16 e.value = value;
17 ++modCount;
18 }
19 break;
20 }
21 e = e.next;
22 }
23 else {
24 if (node != null)
25 node.setNext(first);
26 else
27 node = new HashEntry
28 int c = count + 1;
29 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
30 rehash(node);
31 else
32 setEntryAt(tab, index, node);
33 ++modCount;
34 count = c;
35 oldValue = null;
36 break;
37 }
38 }
39 } finally {
40 unlock();
41 }
42 return oldValue;
43 }
上图segment大致过程是:
index
,获取到头结点node
添加到数组index
索引位这里需要注意的是scanAndLockForPut方法,他在没有获取到锁的时候不仅会通过自旋获取锁,还会做一些其他的查找或新增节点的工,以此来提升put性能。
在上图代码有一段是调用scanAndLockForPut方法,对应行数第三行,该方法的源码是:
1 /**
2 * Scans for a node containing given key while trying to
3 * acquire lock, creating and returning one if not found. Upon
4 * return, guarantees that lock is held. UNlike in most
5 * methods, calls to method equals are not screened: Since
6 * traversal speed doesn't matter, we might as well help warm
7 * up the associated code and accesses as well.
8 *
9 * @return a new node if key not found, else null
10 */
11 private HashEntry
12 HashEntry
13 HashEntry
14 HashEntry
15 int retries = -1; // negative while locating node
16 while (!tryLock()) {
17 HashEntry
18 if (retries < 0) {
19 if (e == null) {
20 if (node == null) // speculatively create node
21 node = new HashEntry
22 retries = 0;
23 }
24 else if (key.equals(e.key))
25 retries = 0;
26 else
27 e = e.next;
28 }
29 else if (++retries > MAX_SCAN_RETRIES) {
30 lock();
31 break;
32 }
33 else if ((retries & 1) == 0 &&
34 (f = entryForHash(this, hash)) != first) {
35 e = first = f; // re-traverse if entry changed
36 retries = -1;
37 }
38 }
39 return node;
40 }
scanAndLockForPut方法在当前线程获取不到segment锁的情况下,完成查找或新建节点的工作。当获取到锁后直接将该节点加入链表即可,提升了put操作的性能。大致过程:
上面的Segment.rehash() 扩容方法就是segment的put方法第30行代码,Segment.rehash() 的源码是:
1 for (HashEntry
2 last != null;
3 last = last.next) {
4 int k = last.hash & sizeMask;
5 if (k != lastIdx) {
6 lastIdx = k;
7 lastRun = last;
8 }
9 }
10 newTable[lastIdx] = lastRun;
11 // Clone remaining nodes
12 for (HashEntry
13 V v = p.value;
14 int h = p.hash;
15 int k = h & sizeMask;
16 HashEntry
17 newTable[k] = new HashEntry
18 }
19 }
20 }
21 }
22 // 将新的节点加到对应索引位
23 int nodeIndex = node.hash & sizeMask; // add the new node
24 node.setNext(newTable[nodeIndex]);
25 newTable[nodeIndex] = node;
26 table = newTable;
27 }
在这里我们可以发现每次扩容是针对一个单独的Segment的,在扩容完成之前中不会对扩容前的数组进行修改,这样就可以保证get()
不被扩容影响。大致过程是:
e.hash & sizeMask;
计算key新的索引位table
现在进行put方法的debug测试:
第一步,传值put(2,"ww"),计算hash值,定位要保存的segment数组,
第二步,进入segment的put方法,值写入第7个segment数组中。在这一步需要执行的东西较多,但是比较重要的是,在写入前创建HashEentry节点初始化然后加锁,添加然后解锁。
我们可以看到get方法是没有加锁的,因为HashEntry的value和next属性是volatile的,volatile直接保证了可见性,所以读的时候可以不加锁。
1 public V get(Object key) {
2 Segment
3 HashEntry
4 int h = hash(key);
5 // 计算出Segment的索引位
6 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
7 // 以原子的方式获取Segment
8 if ((s = (Segment
9 (tab = s.table) != null) {
10 // 原子方式获取HashEntry
11 for (HashEntry
12 (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
13 e != null; e = e.next) {
14 K k;
15 // key相同
16 if ((k = e.key) == key || (e.hash == h && key.equals(k)))
17 // value是volatile所以可以不加锁直接取值返回
18 return e.value;
19 }
20 }
21 return null;
22 }
debug测试get(2),原来就存在key==2的value,看下图,先计算hash值,然后计算segment索引位置,然后判断是否存在segment和hashTable然后循环对比key值,返回value,
1 public int size() {
2 // Try a few times to get accurate count. On failure due to
3 // continuous async changes in table, resort to locking.
4 final Segment
5 int size;
6 // true表示size溢出32位(大于Integer.MAX_VALUE)
7 boolean overflow; // true if size overflows 32 bits
8 long sum; // sum of modCounts
9 long last = 0L; // previous sum
10 int retries = -1; // first iteration isn't retry
11 try {
12 for (;;) {
13 // retries 如果retries等于2则对所有Segment加锁
14 if (retries++ == RETRIES_BEFORE_LOCK) {
15 for (int j = 0; j < segments.length; ++j)
16 ensureSegment(j).lock(); // force creation
17 }
18 sum = 0L;
19 size = 0;
20 overflow = false;
21 // 统计每个Segment元素个数
22 for (int j = 0; j < segments.length; ++j) {
23 Segment
24 if (seg != null) {
25 sum += seg.modCount;
26 int c = seg.count;
27 if (c < 0 || (size += c) < 0)
28 overflow = true;
29 }
30 }
31 if (sum == last)
32 break;
33 last = sum;
34 }
35 } finally {
36 // 解锁
37 if (retries > RETRIES_BEFORE_LOCK) {
38 for (int j = 0; j < segments.length; ++j)
39 segmentAt(segments, j).unlock();
40 }
41 }
42 //
43 return overflow ? Integer.MAX_VALUE : size;
44 }
size的核心思想是先进性两次不加锁统计,如果两次的值一样则直接返回,否则第三个统计的时候会将所有segment全部锁定,再进行size统计,所以size()尽量少用。因为这是在并发情况下,size其他线程也会改变size大小,所以size()的返回值只能表示当前线程、当时的一个状态,可以算其实是一个预估值。
debug进行测试:for体现了除非对比第二次执行break才可以跳出循环,
1 public boolean isEmpty() {
2 long sum = 0L;
3 final Segment
4 for (int j = 0; j < segments.length; ++j) {
5 Segment
6 if (seg != null) {
7 // 只要有一个Segment的元素个数不为0则表示不为null
8 if (seg.count != 0)
9 return false;
10 // 统计操作总数
11 sum += seg.modCount;
12 }
13 }
14 if (sum != 0L) { // recheck unless no modifications
15 for (int j = 0; j < segments.length; ++j) {
16 Segment
17 if (seg != null) {
18 if (seg.count != 0)
19 return false;
20 sum -= seg.modCount;
21 }
22 }
23 // 说明在统计过程中ConcurrentHashMap又被操作过,
24 // 因为上面判断了ConcurrentHashMap不可能会有元素,所以这里如果有操作一定是新增节点
25 if (sum != 0L)
26 return false;
27 }
28 return true;
29 }
和size方法一样这个方法也是一个若一致方法,最后的结果也是一个预估值。
ConcurrentHashMap 拥有更高的并发性。在 HashTable 和由同步包装器包装的 HashMap 中,使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。
ConcurrentHashMap 的高并发性主要来自于三个方面:
用分离锁(Segment锁)实现多个线程间的更深层次的共享访问;
用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求``;
通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性;
参考链接:
https://my.oschina.net/xiaolyuh/blog/3080609
http://xiaobaoqiu.github.io/blog/2014/12/25/concurrenthashmap/
手机扫一扫
移动阅读更方便
你可能感兴趣的文章