1. 初步介绍
我们在使用多线程时,经常会有需要让主线程等待多个子线程执行完毕后再进行最后处理的操作。使用Join方法可以来实现这一点。不过除了join方法之外,JUC中还有一个叫CountDownLatch
的类,用这个类来实现也会更加的优雅和更加的灵活,比如我们看下面这个示例
public class CDLTest {
private static volatile CountDownLatch countDownLatch = new CountDownLatch(2);
public static void main(String[] args) {
//建立两个线程命名为t1、t2
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("t1 over");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("t2 over");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}
});
//正式逻辑部分,启动子线程
t1.start();
t2.start();
System.out.println("waiting");
try {
countDownLatch.await(); //等待子线程执行完毕,返回
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("all over");
}
}
打印结果:
waiting
t1 over
t2 over
all over
在上面的代码中,我们创建了一个传参为2的CountDownLatch
,主线程在调用await方法之后会被阻塞,一直到子线程都执行完毕
其中,CountDownLatch
和join的一个区别是:调用一个子线程的join方法后,该线程会被一直阻塞到子线程运行完毕,而CountDownLatch
可以按照需要,在子线程的代码段中来唤醒主线程。并且使用了计数器的方式可以让多个线程一起来操作。
2. 原理探究
CountDownLatch
,顾名思义,里面应该是有一个计数器的,而且计数器是执行countdown操作的,那么接下来我们带着问题通过源码来看一下这个的具体实现
- 何时初始化计数器
- 何时递减计数器
- 当计数器变为0时做了什么操作
- 多个线程时如何通过计数器同步的
构造方法
首先看一下类图
可以看出它也是使用AQS实现的,并且和ReentrantLock
一样的思路使用了内部类来实现具体功能,接下来看一下构造函数和Sync
public class CountDownLatch {
private final Sync sync;
private volatile int state;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//先省略其他的...
}
通过构造函数,实际上是把计数器的值赋给了AQS的状态变量state,也就是使用AQS的state来表示计数器的值
await方法
当线程调用CountDownLatch
对象的await方法之后,当前线程会被阻塞,直到下面的情况之一才会返回
- 当计数器的值为0时(其他线程通过调用
countDown()
方法来减小值) - 其他线程调用了当前线程的interrupt()方法时,此时会抛出中断异常
public void await() throws InterruptedException {
//委托继承了AQS的sync来执行
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断则抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//查看当前计数器是否为0,为0则直接返回,否则调用AQS的方法使其进入AQS的阻塞队列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
可以看到调用tryAcquireShared
方法并没有让state减1,而仅仅是检查当前state是不是为0
countdown方法
线程调用该方法之后,计数器的值减一,如果减完后计数器为0则唤醒被await方法阻塞的所有线程
public void countDown() {
//委托sync来做,sync调用了父类也就是AQS的方法
sync.releaseShared(1);
}
//这是AQS中的方法
public final boolean releaseShared(int arg) {
//AQS则回去调用sync实现的tryReleaseShared方法
if (tryReleaseShared(arg)) {
//AQS的唤醒线程的方法
doReleaseShared();
return true;
}
return false;
}
//sync的方法
protected boolean tryReleaseShared(int releases) {
//自旋进行CAS,直到成功使用CAS将state减一并更新state
for (;;) {
int c = getState();
//状态值为0则直接返回,也就是让countdown直接返回
if (c == 0)
return false;
//使用CAS来完成减一操作
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
其中,最后面的return nextc == 0
的含义是,如果state变为0了,则这是最后一个调用countdown的方法,此时需要唤醒所有被阻塞的进程(使用上面的doReleaseShared()
)方法
3. 小结
CountDownLatch
是基于AQS实现的,使用AQS的state来存放计数器的值。在初始化时则开始设置计数器的值,每当线程调用countdown方法时实际上是在使用CAS来递减state的值。
当线程调用await方法之后,会被放入AQS的阻塞队列中并等待state为0时再返回。其他线程调用countdown方法让state每次减一,当state变为0时,则需要再调用AQS的doReleaseShared
方法来激活AQS阻塞队列中的线程。
文字如刀,剖开表象直抵本质。