概述
1. 简介
CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
CountDownLatch和CyclicBarrier的区别
(01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。
(02) CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。
2.UML类图
CountDownLatch的数据结构很简单,它是通过"共享锁"实现的。它包含了sync对象,sync是Sync类型。Sync是实例类,它继承于AQS。
CountDownLatch函数列表
//构造一个用给定计数初始化的 CountDownLatch。
CountDownLatch(int count)
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
void await()
// 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
boolean await(long timeout, TimeUnit unit)
// 递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
void countDown()
// 返回当前计数。
long getCount()
// 返回标识此锁存器及其状态的字符串。
String toString()
3.源码分析
CountDownLatch(int count)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
说明:该函数是创建一个Sync对象,而Sync是继承于AQS类。Sync构造函数如下
Sync(int count) {
setState(count);
}
setState()在AQS中实现,源码如下:
protected final void setState(long newState) {
state = newState;
}
说明:在AQS中,state是一个private volatile long类型的对象。对于CountDownLatch而言,state表示的”锁计数器“。CountDownLatch中的getCount()最终是调用AQS中的getState(),返回的state对象,即”锁计数器“。
await()
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//说明:该函数实际上是调用的AQS的acquireSharedInterruptibly(1);
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
说明:acquireSharedInterruptibly()的作用是获取共享锁。
如果当前线程是中断状态,则抛出异常InterruptedException。否则,调用tryAcquireShared(arg)尝试获取共享锁;尝试成功则返回,否则就调用doAcquireSharedInterruptibly()。doAcquireSharedInterruptibly()会使当前线程一直等待,直到当前线程获取到共享锁(或被中断)才返回。
tryAcquireShared()在CountDownLatch.java中被重写,它的源码如下:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
说明:tryAcquireShared()的作用是尝试获取共享锁。如果"锁计数器=0",即锁是可获取状态,则返回1;否则,锁是不可获取状态,则返回-1。
private void doAcquireSharedInterruptibly(long arg)
throws InterruptedException {
// 创建"当前线程"的Node节点,且Node中记录的锁是"共享锁"类型;并将该节点添加到CLH队列末尾。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取上一个节点。
// 如果上一节点是CLH队列的表头,则"尝试获取共享锁"。
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (上一节点不是CLH队列的表头) 当前线程一直等待,直到获取到共享锁。
// 如果线程在等待过程中被中断过,则再次中断该线程(还原之前的中断状态)。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
我们来仔细分析这个方法,线程 t3 经过第 1 步 第4行 addWaiter 入队以后,我们应该可以得到这个:
由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:
然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。
我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的
然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。
接下来,我们来看唤醒的流程,我们假设用 10 初始化 CountDownLatch。
总结
(01) addWaiter(Node.SHARED)的作用是,创建”当前线程“的Node节点,且Node中记录的锁的类型是”共享锁“(Node.SHARED);并将该节点添加到CLH队列末尾。关于Node和CLH在深入理解AQS(AbstractQueuedSynchronizer)已经详细介绍过,这里就不再重复说明了。
(02) node.predecessor()的作用是,获取上一个节点。如果上一节点是CLH队列的表头,则”尝试获取共享锁“。
(03) shouldParkAfterFailedAcquire()的作用和它的名称一样,如果在尝试获取锁失败之后,线程应该等待,则返回true;否则,返回false。
(04) 当shouldParkAfterFailedAcquire()返回ture时,则调用parkAndCheckInterrupt(),当前线程会进入等待状态,直到获取到共享锁才继续运行。doAcquireSharedInterruptibly()中的shouldParkAfterFailedAcquire(), parkAndCheckInterrupt等函数在深入理解AQS(AbstractQueuedSynchronizer)中介绍过,这里也就不再详细说明了。
countDown()
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
// 否则只是简单的 state = state - 1 那么 countDown 方法就结束了
if (tryReleaseShared(arg)) {
// 唤醒 await 的线程
doReleaseShared();
return true;
}
return false;
}
// 这个方法很简单,用自旋的方法实现 state 减 1
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
//通过CAS将state的值减1,失败就不会进入return,继续for循环,直至CAS成功
if (compareAndSetState(c, nextc))
//state减到0就返回true,否则返回false
return nextc == 0;
}
}
countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:
// 调用这个方法的时候,state == 0
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
// 在这里,也就是唤醒 t3 , t3的await()方法可以接着运行了
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
continue; // loop on failed CAS
}
//此时 h == head 说明被唤醒的 t3线程 还没有执行到await()方法中的setHeadAndPropagate(node, r)这一步,则此时循环结束;
//如果执行完setHeadAndPropagate(node, r),则head就为t3了,这里的h和head就不相等,会继续循环
if (h == head) // loop if head changed
break;
}
}
一旦 t3 被唤醒后,我们继续回到 await 的这段代码,在第24行代码 parkAndCheckInterrupt 返回继续接着运行,我们先不考虑中断的情况:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//p表示当前节点的前驱节点
final Node p = node.predecessor();
//此时被唤醒的是之前head的后继节点,所以此线程的前驱节点是head
if (p == head) {
//此时state已经为0,r为1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 2. 这里将唤醒t3的后续节点t4,以此类推,t4被唤醒后,会在t4的await中唤醒t4的后续节点
setHeadAndPropagate(node, r);
// 将已经唤醒的t3节点从队列中去除
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 1. 唤醒后这个方法返回
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下来,t3 会循环一次进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
// 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
// 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
doReleaseShared();
}
}
又回到这个方法了,那么接下来,我们好好分析 doReleaseShared 这个方法,我们根据流程,头节点 head 此时是 t3 节点了
// 调用这个方法的时候,state == 0
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
// 在这里,也就是唤醒 t4
unparkSuccessor(h);
}
else if (ws == 0 &&
// 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
// 否则,就是 head 没变,那么退出循环,
// 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会在await()方法中调用此方法接着唤醒后续节点
if (h == head) // loop if head changed
break;
}
}
总结:CountDownLatch是通过“共享锁”实现的。在创建CountDownLatch中时,会传递一个int类型参数count,该参数是“锁计数器”的初始状态,表示该“共享锁”最多能被count给线程同时获取。当某线程调用该CountDownLatch对象的await()方法时,该线程会等待“共享锁”可用时,才能获取“共享锁”进而继续运行。而“共享锁”可用的条件,就是“锁计数器”的值为0!而“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1;通过这种方式,必须有count个线程调用countDown()之后,“锁计数器”才为0,而前面提到的等待线程才能继续运行!以上,就是CountDownLatch的实现原理。
4. 使用示例
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
public class CountDownLatchTest1 {
private static int LATCH_SIZE = 5;
private static CountDownLatch doneSignal;
public static void main(String[] args) {
try {
doneSignal = new CountDownLatch(LATCH_SIZE);
// 新建5个任务
for(int i=0; i<LATCH_SIZE; i++)
new InnerThread().start();
System.out.println("main await begin.");
// "主线程"等待线程池中5个任务的完成
doneSignal.await();
System.out.println("main await finished.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class InnerThread extends Thread{
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");
// 将CountDownLatch的数值减1
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果:
main await begin.
Thread-0 sleep 1000ms.
Thread-2 sleep 1000ms.
Thread-1 sleep 1000ms.
Thread-4 sleep 1000ms.
Thread-3 sleep 1000ms.
main await finished.
最后
以上就是失眠哈密瓜为你收集整理的Java多线程 - CountDownLatch原理的全部内容,希望文章能够帮你解决Java多线程 - CountDownLatch原理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复