1. AQS是什么
AQS的全称是Abstract Queued Synchronizer,即抽象队列同步器,它封装了一套多线程访问共享资源的模板,为同步类来实现同步提供了大量的细节解决方案
JUC中的ReentrantLock
、Semaphore
、CountDownLatch
等都是基于AQS来实现的
对于AQS来说,线程同步的关键是对状态值state进行操作,根据state是否属于一个线程,操作state的方式分为独占方式和共享方式。在独占方式下获取和释放资源使用的方法为acquire
和release
,在共享方式下的方法为acquireShared
和releaseShared
AQS内部成员结构
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
//...省略
static final class Node {
//先省略,后面再看
}
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
//其他先省略,后面再看
}
//关键成员变量
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
//抽象方法部分,需要子类去具体实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//CAS部分
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
//省略其他方法...
}
可以看见AQS主要由三个部分组成:一个volatile类型的变量state,一个带有头结点和尾结点指针的Node链队,还有一个ConditionObject
内部类,而ConditionObject
也是由Node链表组成的队列...
State变量
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
State在AQS中充当同步状态表示的作用,并且提供了get、set方法来修改同步状态。对于线程同步来说,获取、释放资源都是由state决定的,而state的具体语义需要实现类去自己定义,例如下面一个实现类的定义
ReentrantLock
:表示是否有锁资源ReentrantReadWriteLock
:高16位表示读锁状态,低16位表示写锁状态Semaphore
:表示可用信号量的个数CountDownLatch
:表示计数器的值
Node内部类
static final class Node {
//共享标记与独占标记
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
//waitStatus的四种取值
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
//节点等待状态
volatile int waitStatus;
//前驱
volatile Node prev;
//后继
volatile Node next;
//标记的线程
volatile Thread thread;
//特殊标记
Node nextWaiter;
//是否共享式
final boolean isShared() {
return nextWaiter == SHARED;
}
//获取前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
//创建阻塞队列的构造方法
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//创建条件队列的构造方法
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
可见每个Node都有一个记录Thread的标记,也就是每个Node可以对应一个Thread
其中waitStatus
的四种取值代表的含义如下:
- CANCELLED(1) 表示当前节点已经取消调度(timeout或者被中断)
- SINGAL(-1) 表示后继节点在等待当前节点唤醒
- CONDITION(-2) 表示节点在Condition的队列上
- PROPAGATE(-3) 共享模式下的节点状态,前驱结点不仅会唤醒后驱节点,同时也可能唤醒后驱的后驱节点
特殊标记nextWaiter
的含义如下
- Node在AQS队列中时,表示共享式或者独占式节点标记
- Node在Condition队列时,表示下一个Node节点的指针
2. AQS实现流程
关于线程被阻塞的流程,首先用一句话概括一下:当线程获取资源失败时,会被封装成Node节点从AQS队列的尾部入队,并阻塞线程。当某线程释放资源时会把AQS队列头部(实际上是第二个,因为有哨兵,后面详解)的线程唤醒,再次获取节点
节点的创建与入队
在AQS是使用的addWaiter
函数完成
private Node addWaiter(Node mode) {
//根据当前线程创建节点,等待状态为0
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点
Node pred = tail;
if (pred != null) {
//如果尾节点不等于null,把当前节点的前驱节点指向尾节点
node.prev = pred;
//通过cas把尾节点指向当前节点
if (compareAndSetTail(pred, node)) {
//之前尾节点的下个节点指向当前节点
pred.next = node;
return node;
}
}
//如果添加失败或队列不存在,执行enq函数
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) { //自旋
//获取尾节点
Node t = tail;
if (t == null) {
//如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
if (compareAndSetHead(new Node()))
//cas成功,尾节点指向哨兵节点
tail = head;
} else {
//当前节点的前驱节点设指向之前尾节点
node.prev = t;
//cas设置把尾节点指向当前节点
if (compareAndSetTail(t, node)) {
//cas成功,哨兵的下个节点指向当前节点
t.next = node;
return t;
}
}
}
}
关于enq
函数的具体流程逻辑,可以看下面这个图来理解,图片转自微信公众号:程序猿阿星
多线程并发问题
当很多个线程同时执行的时候,上面的代码通过CAS+死循环自旋的方式,来保证每次只有一个线程可以成功,如果失败则下次重试。如果是N个线程的话则最拉的线程最多循环N次也能保证成功。
节点的出队
一句话概括:当持有资源的线程释放资源时,会将head.next
指向的线程节点唤醒,也就是AQS队列哨兵节点的后继节点。如果唤醒的线程节点获取资源成功,线程节点清空信息并将其设置为哨兵节点,原哨兵节点直接出队
这个过程不需要CAS来保证,因为只有一个线程能够获取到资源
final boolean acquireQueued(final Node node, int arg) {
//默认获取失败
boolean failed = true;
try {
//线程是否被中断过
boolean interrupted = false;
for (;;) { //以自旋的方式等待获取资源
final Node p = node.predecessor();
//tryAcquire的具体逻辑由子类来实现
if (p == head && tryAcquire(arg)) {
//获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//原来首节点下个节点指向为null
p.next = null; // help GC
//设置为获取成功
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //被park处
interrupted = true;
}
} finally {
// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
//节点设置为头部
head = node;
//清空线程
node.thread = null;
//清空前驱节点
node.prev = null;
}
获取与释放资源
acquire用于线程获取资源(独占)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
虽然代码看上去很奇怪,但实际上逻辑就是:如果tryAcuqire
失败(获取资源失败),则执行第二个if判断条件,也就是将当前线程封装成Node节点并插入到AQS队列中,并调用LockSupport.park(this)
方法挂起自己,其中park方法位于上面的acquireQueued
方法中的parkAndCheckInterrupt()
部分
release用于释放资源(独占)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
当一个线程调用release方法时,会尝试使用tryRelease
操作释放资源(实际上是设置state的值),然后判断队列是否为null以及队头的waitStatus
是否为0,如果为0则说明AQS队列为空,如果不为0则要执行unparkSuccessor
方法,这个方法主要做的事情有:
- 将队头元素
waitStatus
设置为0 - 找到离队头最近的没有被cancel的节点(
waitStatus
小于等于0),并将其作为队头 - 将这个节点唤醒(之后这个节点会继续执行
acquireQueued
的逻辑)
共享式释放资源的思路也是类似的,只是有些常量不一样,这里就不放出来了
3. 条件队列的支持
Object 的 wait、notify 函数是配合 Synchronized 锁实现线程间同步协作的功能,AQS 的 ConditionObject
条件变量也提供这样的功能。通过 ConditionObject
的 await 和 signal 两类函数完成。
每个ConditionObject
都对应着一个条件队列,这个队列是带头尾节点的单链表
不同于 Synchronized 锁,一个 AQS 可以对应多个条件变量,而Synchronized只有一个,如下图
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//封装成node后进入condition队列
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//对队头第一个node执行signal操作
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// CAS失败则回到上面自旋
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//条件队列出队的节点会被添加到AQS队列中
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
逻辑总结来说就是:当某个线程执行了 ConditionObject
的 await 函数,阻塞当前线程。线程会被封装成 Node 节点添加到条件队列的末端,其他线程执行 ConditionObject
的 signal 函数,会将条件队列头部线程节点转移到 AQS 队列参与竞争资源.
和AQS队列不同的是,条件队列使用的是nextWaiter
作为下个节点的指针,而AQS队列使用的是next
和prev