技术是为了解决问题而生的,ConcurrentHashMap 解决了多个线程同时操作一个 HashMap 时,可能出现的内部问题。当多个线程同时操作一个 HashMap 时,有可能会出现多线程同时修改一个共享变量(HashMap 类的成员变量),导致数据被覆盖,产生意想不到的错误。
ConcurrentHashMap 内部使用了锁和各种数据结构来保证访问 Map 是线程安全的。
ConcurrentHashMap 和 HashMap 底层的数组、链表结构几乎相同,底层对数据结构的操作思路是相同的。ConcurrentHashMap 除了数组 + 链表 + 红黑树的基本结构外,新增了转移节点结构(ForwardingNode)。
介绍转移节点(ForwardingNode)
转移节点是 ForwardingNode 结构, ForwardingNode 继承了 Node。ForwardingNode 节点的 hash 值固定为 -1。ForwardingNode 比 Node 多了一个 nextTable 成员变量,nextTable 成员变量的类型是 Node 数组。nextTable 成员变量是扩容之后的新数组。
如果数组在索引 i 上的结构是 ForwardingNode,那么表示这个哈希桶内的全部节点都已经转移到扩容之后的新数组,旧的哈希桶内的数据不能发生改变。转移节点(ForwardingNode)是为了保证 ConcurrentHashMap 扩容时的线程安全。保证了当一个哈希桶内的全部节点都已经转移到扩容之后的新数组后、扩容操作完成之前,旧的哈希桶内的数据不发生变化。
当一个哈希桶内的全部节点都已经转移到扩容之后的新数组后、扩容操作完成之前,如果有其他的线程执行 put 操作,需要将新增的节点 put 到旧的哈希桶内,那么这个线程会调用 helpTransfer() 方法帮助扩容。扩容完成之后,这个线程再将要新增的节点 put 到新的哈希桶内。
ConcurrentHashMap 在 put 方法上对数据结构的操作思路和 HashMap 相同,但 ConcurrentHashMap 的 put() 方法写了很多保障线程安全的代码。当调用 ConcurrentHashMap 的 put() 方法时,put() 方法的处理逻辑如下:
当调用 CHM 的 put() 方法时,如果 CHM 中已经存在要新增的 key,并且方法的入参 onlyIfAbsent 为 false,则替换旧值,并返回旧值。
ConcurrentHashMap 的扩容时机和 HashMap 相同,都是在 put() 方法的最后一步检查是否需要扩容。ConcurrentHashMap 扩容的方法叫做 transfer(),从 put() 方法的 addCount() 方法进去,就能找到 transfer() 方法。
如果 ConcurrentHashMap 中元素的数量超过了扩容的阈值(sizeCtl),那么它会调用 transfer() 方法执行扩容操作。ConcurrentHashMap 的扩容机制是扩容为原来容量的 2 倍。ConcurrentHashMap 扩容的处理逻辑和 HashMap 完全不同。
ConcurrentHashMap 扩容的大体思路如下:扩容需要把旧数组上的全部节点转移到扩容之后的新数组上,节点的转移是从数组的最后一个索引位置开始,一个索引一个索引进行的。每个线程一轮处理有限个数的哈希桶。当旧数组上的全部节点转移到扩容之后的新数组后,ConcurrentHashMap 的 table 成员变量指向扩容之后的新数组,扩容操作完成。transfer() 方法的处理逻辑如下:
首先根据 CPU 核数和 table 数组的长度,计算当前线程一轮处理哈希桶的个数。ConcurrentHashMap 的 transferIndex 成员变量会记录下一轮 或者是 下一个线程要处理的哈希桶的索引值 + 1
num = (tab.length >>> 3) / NCPU
的值:领取完任务之后线程就开始处理哈希桶内的节点。节点的转移是从数组的最后一个索引位置开始,一个索引一个索引进行的。在转移索引 i 上的节点之前,它会使用 synchronized 关键字给索引 i 上的结构加锁,保证同时最多只有一个线程操作索引 i 上的结构。
给索引 i 上的结构加锁之后,它会判断数组在索引 i 上的结构是链表 还是 红黑树,然后调用相应的节点转移代码。
如果索引 i 上的结构是链表,它通过将节点 key 的 hash 值 和 数组的长度 n 做与运算获得 n 对应的二进制表示中的 1 这一位在 hash 值中是 0 还是 1,即 b = p.hash & n
。获取到 b 之后,用 b 来判断要转移的节点是要挂到低位哈希桶里,还是挂到高位哈希桶里。遍历完链表,形成两个链表(低位链表、高位链表)之后,将链表的头节点赋值给对应的 tab[i]:
如果索引 i 上的结构是红黑树那么使用红黑树方式转移节点。
在将索引 i 上的全部节点转移到扩容之后的新数组后,它让旧数组 tab[i] 指向转移节点(ForwardingNode)。
当旧数组上的全部节点转移到扩容之后的新数组后,ConcurrentHashMap 的 table 成员变量指向扩容之后的新数组,扩容操作完成。
介绍低位哈希桶、高位哈希桶:如果 ConcurrentHashMap 当前的数组长度为 n 时,一个节点的 key 对应的哈希桶索引为 i。那么 ConcurrentHashMap 扩容之后数组长度为 2n 时,这个节点的 key 对应的低位哈希桶的索引为 i,对应的高位哈希桶的索引为 i + n。
ConcurrentHashMap 支持多线程扩容:
当调用 ConcurrentHashMap 的 get() 方法时,get() 方法的处理逻辑如下:
ConcurrentHashMap 的数组长度总是为 2 的幂次方。不论传入的初始容量是否为 2 的幂次方,最终都会转化为 2 的幂次方。
ConcurrentHashMap 中根据 key 计算出 hash 值,然后根据计算出的 hash 值计算出 key 对应的数组索引 i:
hash = (hashCode >>> 16) ^ hashCode
。i = hash & (n - 1)
。ConcurrentHashMap 的数组长度总是为 2 的幂次方设计的非常巧妙:
hash = (hashCode >>> 16) ^ hashCode
。使 key 的 hashCode 值高 16 位的变化映射到低 16 位中,使 hashCode 值高 16 位也参与后续索引 i 的计算,减少了碰撞的可能性。i = hash & (n - 1)
。由于数组的长度 n 是 2 的幂次方,n - 1 可以保证它的二进制表示中的后几位都是 1,n 对应的二进制位及之前的位都是 0。因此,计算出的数组索引 i 和 hash 值的二进制表示中后几位有关,而与前面的二进制位无关a % b == a & (b - 1)
。CPU 处理位运算比处理数学运算的速度更快,效率更高。b = p.hash & n
。获取到 b 之后,用 b 来判断要转移的节点是要挂到低位哈希桶里,还是挂到高位哈希桶里:当调用 ConcurrentHashMap 的 put() 方法时,put() 方法的最后一步是调用 addCount() 方法。
addCount() 方法的任务是:增加 ConcurrentHashMap 中元素的计数值。如果元素的数量超过了 ConcurrentHashMap 扩容的阈值(sizeCtl),那么就会调用 transfer() 方法执行扩容操作。如果此时有其他的线程已经在执行扩容操作,那么当前线程就协助扩容。
ConcurrentHashMap 采用了一些数据结构和手段来支持高效的并发计数。ConcurrentHashMap 使用 long 类型的 baseCount 成员变量和 CounterCell 类型的 counterCells 数组来支持高效的并发计数。
ConcurrentHashMap 的计数将线程竞争分散到 counterCells 数组的每一个元素,提高了并发计数的性能。
private transient volatile long baseCount;
// 如果 counterCells 数组不为空,则数组的长度为 2 的幂次方。
private transient volatile CounterCell[] counterCells;
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章