继上次看完HashMap之后,这次来看一看技术含量更高的ConcurrentHashMap

JDK1.8之后的ConcurrentHashMap的源码量可以说是完爆HashMap了,毕竟要保证线程安全。对比HashTable的话里面也采取了各种措施来避免加锁从而保证性能的可靠性(核心:CAS为主+Synchronized作补充)

构造方法

如果传入一个初始长度,则会由tableSizeFor方法保证这个容量是2的幂次方

例如,如果构造时传60则创建的长度会变成64,传32的话也会变成64,详情看下面的源码

private transient volatile int sizeCtl;

public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
               MAXIMUM_CAPACITY :
               tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
}

public ConcurrentHashMap() {
}

private static final int tableSizeFor(int c) {
    int n = c - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

sizeCtl含义

sizeCtl是个相当灵活的变量,它一个人就充当了四个成员变量的任务(x

这里稍微总结一下它的具体表示为:

  • 0:表示未初始化
  • 正数:如果数组尚未初始化则记录初始容量,已经初始化后记录扩容阈值
  • -1:正在初始化
  • 负数:正在扩容,高15位为容量n ,低16位为并行扩容线程数+1
  • 全大写的SIZECTL常量为它的偏移量,定义在静态代码块里面SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"))

初始化方法

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // 如果正在初始化或者正在扩容,则自旋
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //CAS操作,赋值为-1
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //new了个node数组
                    table = tab = nt; //传给对象
                    sc = n - (n >>> 2); // sc = n - n/4的位运算形式
                }
            } finally {
                sizeCtl = sc; // 此时sizeCtl为扩容阈值
            }
            break;
        }
    }
    return tab;
}

常用方法

get方法

由于并发状态下的读取本来也没有什么安全隐患,因此get方法并没有什么特别的,和HashMap的思路大体相同,这里就省略了

put方法

简直长的不要不要的,关键点有:

  • 不允许空值
  • 数组对应位置为空时采用CAS操作插入和判断,不加锁
  • 如果数组位置不为空,要查找链表或者红黑树时,才会锁当前数组位置头结点元素,保证插入安全
public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException(); //不允许空值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //使用CAS的方式看当前数组位置是否为空
            if (casTabAt(tab, i, null,  //双重检验,同样也是CAS
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f); //如果发现当前节点的正在扩容,则协助扩容
        else {
            V oldVal = null;
            synchronized (f) {    //终于用上了锁,保证当前数组位置元素尾插是安全的
                if (tabAt(tab, i) == f) { //判断是否被其他线程变成了树
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) { //终于开始遍历链表了
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) { //对应红黑树的查找
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) { //和HashMap一样的逻辑,判断要不要变成树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i); //和HashMap一样,数组长度小于64时不会变树,而是先扩容数组
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount); //维护当前数组元素个数的方法,里面用到了类似于LongAddr的cell思想;同时还判断了是否需要扩容
    return null;
}
spread方法
static final int HASH_BITS = 0x7fffffff
static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

通过这个方法保证hash值永远是正数,HashMap里面是没有的

sum count获取长度

LongAdder类似的baseCount+cellCount作为真正的长度

final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

扩容机制

ConcurrentHashMap精彩的地方就在于这个扩容了,可以实现多线程协助扩容。扩容部分的代码是ConcurrentHashMap中最长的一个部分了,也是最硬核的部分

协助扩容

当执行put、remove等方法时,其中都有一段检查当前节点是否处于扩容状态的代码,如果是的话,则帮助其扩容

要点

  • 每个线程默认情况下会领取16个槽扩容的任务
  • 每次进行数据迁移的时候都会将当前节点设置成为forwarding node,这个节点的特殊性在于除了其Hash字段为一个特定的MOVED字段值以外,其他都是null。也正是通过这个forwarding node来让其他的线程判断当前位置是否已经被处理了,以及给其他方法判断当前是不是正在发生扩容
  • 扩容时会将每个槽的链表分为高低位两个部分,低位对应新数组原位置,高位对应新数组原位置+扩容大小的位置

协助扩容部分:

//主要逻辑:判断是否还正在扩容,如果是则帮助其扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    //数据校验
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        int rs = resizeStamp(tab.length);
        //判断要不要加入扩容
        while (nextTab == nextTable && table == tab &&
               (sc = sizeCtl) < 0) {
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 如果不满足线程数限制、以及扩容没结束,则进行协助扩容
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                transfer(tab, nextTab);
                break;
            }
        }
        return nextTab;
    }
    return table;
}

扩容部分(硬核)

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //定义每个线程需要领取的槽的任务数stride
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
    //如果新数组不存在则进行初始化扩容数组
    if (nextTab == null) {            
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    //forwarding node节点,用于判断当前位置是否迁移结束
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true; //单个槽工作完成标识符,可以处理下一个位置
    boolean finishing = false; //收尾工作标识符
    //一次循环领取一次任务(16个槽)
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        //领取数组区间的部分
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSwapInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        //如果迁移已经全部完成,则进行收尾工作
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) { 
                nextTable = null; //将nextTable设置为null表示扩容结束
                table = nextTab; //将table转为新数组
                sizeCtl = (n << 1) - (n >>> 1); //重新设置扩容阈值
                return;
            }
            //每一次线程扩容结束就对sizeCtl进行CAS减一处理
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //如果当前线程不是扩容的最后一个线程,则回到上层循环继续准备领任务
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit <- 源代码的注释:重新检查防止遗漏的槽
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            //如果当前数组位置为空则不迁移,直接放forwarding node
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            //如果当前位置碰到了forwarding node,则代表已经被其他线程迁移了
            advance = true; 
        else {
            synchronized (f) { //锁住当前位置的元素,开始进行数据迁移
                //迁移过程中会将原数组分为高低两个部分
                //低位部分会放在新数组的第i个位置,高位部分会放在i+n的位置
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    //判断是否是链表
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        //遍历链表
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        //生成高低两个链表
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        //链表的迁移具体步骤,均为调用CAS方法来完成迁移
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        //在原来的位置防止forwarding node
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    //如果不是链表,则对应红黑树的迁移
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //继续判断迁移完成后是否将红黑树进行转换
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        //标记当前位置迁移已经完成
                        advance = true;
                    }
                }
            }
        }
    }
}
Last modification:June 18th, 2021 at 05:49 pm