1. AQS是什么

AQS的全称是Abstract Queued Synchronizer,即抽象队列同步器,它封装了一套多线程访问共享资源的模板,为同步类来实现同步提供了大量的细节解决方案

JUC中的ReentrantLockSemaphoreCountDownLatch等都是基于AQS来实现的

对于AQS来说,线程同步的关键是对状态值state进行操作,根据state是否属于一个线程,操作state的方式分为独占方式共享方式。在独占方式下获取和释放资源使用的方法为acquirerelease,在共享方式下的方法为acquireSharedreleaseShared

AQS内部成员结构

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    //...省略
    
    static final class Node {
        //先省略,后面再看
    }
    public class ConditionObject implements Condition, java.io.Serializable {
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
        //其他先省略,后面再看
    }
    
    //关键成员变量
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int state;
    
    //抽象方法部分,需要子类去具体实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    //CAS部分
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }
    
    //省略其他方法...
}

可以看见AQS主要由三个部分组成:一个volatile类型的变量state,一个带有头结点和尾结点指针的Node链队,还有一个ConditionObject内部类,而ConditionObject也是由Node链表组成的队列...

State变量

private volatile int state;

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}
    
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

State在AQS中充当同步状态表示的作用,并且提供了get、set方法来修改同步状态。对于线程同步来说,获取、释放资源都是由state决定的,而state的具体语义需要实现类去自己定义,例如下面一个实现类的定义

  • ReentrantLock:表示是否有锁资源
  • ReentrantReadWriteLock:高16位表示读锁状态,低16位表示写锁状态
  • Semaphore:表示可用信号量的个数
  • CountDownLatch:表示计数器的值

Node内部类

static final class Node {
    //共享标记与独占标记
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    
    //waitStatus的四种取值
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    
    //节点等待状态
    volatile int waitStatus;
    //前驱
    volatile Node prev;
    //后继
    volatile Node next;
    //标记的线程
    volatile Thread thread;
    //特殊标记
    Node nextWaiter;
    //是否共享式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
   //获取前驱节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    
    }
    
    //创建阻塞队列的构造方法
    Node(Thread thread, Node mode) {     
        this.nextWaiter = mode;
        this.thread = thread;
    }
    
    //创建条件队列的构造方法
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

可见每个Node都有一个记录Thread的标记,也就是每个Node可以对应一个Thread

其中waitStatus的四种取值代表的含义如下:

  • CANCELLED(1) 表示当前节点已经取消调度(timeout或者被中断)
  • SINGAL(-1) 表示后继节点在等待当前节点唤醒
  • CONDITION(-2) 表示节点在Condition的队列上
  • PROPAGATE(-3) 共享模式下的节点状态,前驱结点不仅会唤醒后驱节点,同时也可能唤醒后驱的后驱节点

特殊标记nextWaiter的含义如下

  • Node在AQS队列中时,表示共享式或者独占式节点标记
  • Node在Condition队列时,表示下一个Node节点的指针

2. AQS实现流程

关于线程被阻塞的流程,首先用一句话概括一下:当线程获取资源失败时,会被封装成Node节点从AQS队列的尾部入队,并阻塞线程。当某线程释放资源时会把AQS队列头部(实际上是第二个,因为有哨兵,后面详解)的线程唤醒,再次获取节点

节点的创建与入队

在AQS是使用的addWaiter函数完成

private Node addWaiter(Node mode) {
    //根据当前线程创建节点,等待状态为0
    Node node = new Node(Thread.currentThread(), mode);
    // 获取尾节点
    Node pred = tail;
    if (pred != null) {
        //如果尾节点不等于null,把当前节点的前驱节点指向尾节点
        node.prev = pred;
        //通过cas把尾节点指向当前节点
        if (compareAndSetTail(pred, node)) {
            //之前尾节点的下个节点指向当前节点
            pred.next = node;
            return node;
        }
    }
    //如果添加失败或队列不存在,执行enq函数
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) { //自旋
        //获取尾节点
        Node t = tail;
        if (t == null) {
            //如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
            if (compareAndSetHead(new Node()))
                //cas成功,尾节点指向哨兵节点
                tail = head;
        } else {
            //当前节点的前驱节点设指向之前尾节点
            node.prev = t;
            //cas设置把尾节点指向当前节点
            if (compareAndSetTail(t, node)) {
                //cas成功,哨兵的下个节点指向当前节点
                t.next = node;
                return t;
            }
        }
    }
}

关于enq函数的具体流程逻辑,可以看下面这个图来理解,图片转自微信公众号:程序猿阿星

多线程并发问题

当很多个线程同时执行的时候,上面的代码通过CAS+死循环自旋的方式,来保证每次只有一个线程可以成功,如果失败则下次重试。如果是N个线程的话则最拉的线程最多循环N次也能保证成功。

节点的出队

一句话概括:当持有资源的线程释放资源时,会将head.next指向的线程节点唤醒,也就是AQS队列哨兵节点的后继节点。如果唤醒的线程节点获取资源成功,线程节点清空信息并将其设置为哨兵节点,原哨兵节点直接出队

这个过程不需要CAS来保证,因为只有一个线程能够获取到资源

final boolean acquireQueued(final Node node, int arg) {
    //默认获取失败
    boolean failed = true;
    try {
        //线程是否被中断过
        boolean interrupted = false;
        for (;;) { //以自旋的方式等待获取资源
            final Node p = node.predecessor();
            //tryAcquire的具体逻辑由子类来实现
            if (p == head && tryAcquire(arg)) {
                //获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
                setHead(node);
                //原来首节点下个节点指向为null
                p.next = null; // help GC
                //设置为获取成功
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //被park处
                interrupted = true;
        }
    } finally {
        // 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
        if (failed)
            cancelAcquire(node);
    }
}

private void setHead(Node node) {
    //节点设置为头部
    head = node;
    //清空线程
    node.thread = null;
    //清空前驱节点
    node.prev = null;
}

获取与释放资源

acquire用于线程获取资源(独占)

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

虽然代码看上去很奇怪,但实际上逻辑就是:如果tryAcuqire失败(获取资源失败),则执行第二个if判断条件,也就是将当前线程封装成Node节点并插入到AQS队列中,并调用LockSupport.park(this)方法挂起自己,其中park方法位于上面的acquireQueued方法中的parkAndCheckInterrupt()部分

release用于释放资源(独占)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

当一个线程调用release方法时,会尝试使用tryRelease操作释放资源(实际上是设置state的值),然后判断队列是否为null以及队头的waitStatus是否为0,如果为0则说明AQS队列为空,如果不为0则要执行unparkSuccessor方法,这个方法主要做的事情有:

  • 将队头元素waitStatus设置为0
  • 找到离队头最近的没有被cancel的节点(waitStatus小于等于0),并将其作为队头
  • 将这个节点唤醒(之后这个节点会继续执行acquireQueued的逻辑)

共享式释放资源的思路也是类似的,只是有些常量不一样,这里就不放出来了

3. 条件队列的支持

Object 的 wait、notify 函数是配合 Synchronized 锁实现线程间同步协作的功能,AQS 的 ConditionObject 条件变量也提供这样的功能。通过 ConditionObject 的 await 和 signal 两类函数完成。

每个ConditionObject都对应着一个条件队列,这个队列是带头尾节点的单链表

不同于 Synchronized 锁,一个 AQS 可以对应多个条件变量,而Synchronized只有一个,如下图

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //封装成node后进入condition队列
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { 
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    //对队头第一个node执行signal操作
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    
    // CAS失败则回到上面自旋
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    
    //条件队列出队的节点会被添加到AQS队列中
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

逻辑总结来说就是:当某个线程执行了 ConditionObject 的 await 函数,阻塞当前线程。线程会被封装成 Node 节点添加到条件队列的末端,其他线程执行 ConditionObject 的 signal 函数,会将条件队列头部线程节点转移到 AQS 队列参与竞争资源.

和AQS队列不同的是,条件队列使用的是nextWaiter作为下个节点的指针,而AQS队列使用的是nextprev

Last modification:June 29th, 2021 at 03:24 pm