继上次看完HashMap
之后,这次来看一看技术含量更高的ConcurrentHashMap
JDK1.8之后的ConcurrentHashMap
的源码量可以说是完爆HashMap
了,毕竟要保证线程安全。对比HashTable
的话里面也采取了各种措施来避免加锁从而保证性能的可靠性(核心:CAS为主+Synchronized作补充)
构造方法
如果传入一个初始长度,则会由tableSizeFor
方法保证这个容量是2的幂次方
例如,如果构造时传60则创建的长度会变成64,传32的话也会变成64,详情看下面的源码
private transient volatile int sizeCtl;
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
public ConcurrentHashMap() {
}
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
sizeCtl含义
sizeCtl
是个相当灵活的变量,它一个人就充当了四个成员变量的任务(x
这里稍微总结一下它的具体表示为:
- 0:表示未初始化
- 正数:如果数组尚未初始化则记录初始容量,已经初始化后记录扩容阈值
- -1:正在初始化
- 负数:正在扩容,高15位为容量n ,低16位为并行扩容线程数+1
- 全大写的
SIZECTL
常量为它的偏移量,定义在静态代码块里面SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"))
初始化方法
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 如果正在初始化或者正在扩容,则自旋
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //CAS操作,赋值为-1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //new了个node数组
table = tab = nt; //传给对象
sc = n - (n >>> 2); // sc = n - n/4的位运算形式
}
} finally {
sizeCtl = sc; // 此时sizeCtl为扩容阈值
}
break;
}
}
return tab;
}
常用方法
get方法
由于并发状态下的读取本来也没有什么安全隐患,因此get方法并没有什么特别的,和HashMap
的思路大体相同,这里就省略了
put方法
简直长的不要不要的,关键点有:
- 不允许空值
- 数组对应位置为空时采用CAS操作插入和判断,不加锁
- 如果数组位置不为空,要查找链表或者红黑树时,才会锁当前数组位置头结点元素,保证插入安全
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException(); //不允许空值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS的方式看当前数组位置是否为空
if (casTabAt(tab, i, null, //双重检验,同样也是CAS
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); //如果发现当前节点的正在扩容,则协助扩容
else {
V oldVal = null;
synchronized (f) { //终于用上了锁,保证当前数组位置元素尾插是安全的
if (tabAt(tab, i) == f) { //判断是否被其他线程变成了树
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { //终于开始遍历链表了
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) { //对应红黑树的查找
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) { //和HashMap一样的逻辑,判断要不要变成树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); //和HashMap一样,数组长度小于64时不会变树,而是先扩容数组
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); //维护当前数组元素个数的方法,里面用到了类似于LongAddr的cell思想;同时还判断了是否需要扩容
return null;
}
spread方法
static final int HASH_BITS = 0x7fffffff
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
通过这个方法保证hash值永远是正数,HashMap
里面是没有的
sum count获取长度
和LongAdder
类似的baseCount
+cellCount
作为真正的长度
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
扩容机制
ConcurrentHashMap精彩的地方就在于这个扩容了,可以实现多线程协助扩容。扩容部分的代码是ConcurrentHashMap
中最长的一个部分了,也是最硬核的部分
协助扩容
当执行put、remove等方法时,其中都有一段检查当前节点是否处于扩容状态的代码,如果是的话,则帮助其扩容
要点
- 每个线程默认情况下会领取16个槽扩容的任务
- 每次进行数据迁移的时候都会将当前节点设置成为forwarding node,这个节点的特殊性在于除了其Hash字段为一个特定的MOVED字段值以外,其他都是null。也正是通过这个forwarding node来让其他的线程判断当前位置是否已经被处理了,以及给其他方法判断当前是不是正在发生扩容
- 扩容时会将每个槽的链表分为高低位两个部分,低位对应新数组原位置,高位对应新数组原位置+扩容大小的位置
协助扩容部分:
//主要逻辑:判断是否还正在扩容,如果是则帮助其扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//数据校验
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
//判断要不要加入扩容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 如果不满足线程数限制、以及扩容没结束,则进行协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
扩容部分(硬核)
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//定义每个线程需要领取的槽的任务数stride
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
//如果新数组不存在则进行初始化扩容数组
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//forwarding node节点,用于判断当前位置是否迁移结束
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; //单个槽工作完成标识符,可以处理下一个位置
boolean finishing = false; //收尾工作标识符
//一次循环领取一次任务(16个槽)
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//领取数组区间的部分
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//如果迁移已经全部完成,则进行收尾工作
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null; //将nextTable设置为null表示扩容结束
table = nextTab; //将table转为新数组
sizeCtl = (n << 1) - (n >>> 1); //重新设置扩容阈值
return;
}
//每一次线程扩容结束就对sizeCtl进行CAS减一处理
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//如果当前线程不是扩容的最后一个线程,则回到上层循环继续准备领任务
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit <- 源代码的注释:重新检查防止遗漏的槽
}
}
else if ((f = tabAt(tab, i)) == null)
//如果当前数组位置为空则不迁移,直接放forwarding node
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//如果当前位置碰到了forwarding node,则代表已经被其他线程迁移了
advance = true;
else {
synchronized (f) { //锁住当前位置的元素,开始进行数据迁移
//迁移过程中会将原数组分为高低两个部分
//低位部分会放在新数组的第i个位置,高位部分会放在i+n的位置
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//判断是否是链表
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
//遍历链表
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//生成高低两个链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//链表的迁移具体步骤,均为调用CAS方法来完成迁移
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//在原来的位置防止forwarding node
setTabAt(tab, i, fwd);
advance = true;
}
//如果不是链表,则对应红黑树的迁移
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//继续判断迁移完成后是否将红黑树进行转换
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
//标记当前位置迁移已经完成
advance = true;
}
}
}
}
}
}