我是靠谱客的博主 糊涂鼠标,最近开发中收集的这篇文章主要介绍CountDownLatch1  前言 2  常用方法3 示例4 解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

1  前言 

2  常用方法

3 示例

4 解析

4.1 countDown()

4.2 await() 源码

1  前言 

countDownLatch( 门阀、 计数器)是多线程控制的一种工具 ,它用来协调各个线程之间的同步。

countDownLatch相当于一个计数器,能够使一个线程等待另外一些线程完成各自的工作后,再继续执行。这个计数器的初始值就是线程的数量,每当一个线程完成之后,计数器就进行减1,当计数器的值为0时,那么在countDownLatch上等待的线程就可以继续执行。

countDownLatch接收一个int类型的参数,表示要等待的工作线程个数

public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

2  常用方法

方法说明
await()使当前线程进入同步队列进行等待,直到latch的值被减到0或者当前线程被中断,当前线程就会被唤醒。
await(long timeout, TimeUnit unit)带超时时间的await()
countDown()使latch的值减1,如果减到了0,则会唤醒所有等待在这个latch上的线程
getCount()获得latch的数值

3 示例

让5个子线程的任务执行完成之后再执行主线程的任务。

public class CountDownLatchDemo {
private static final int THRED_NUM = 5;
public static void main(String[] args) {
//创建固定线城市数量的线程池
ExecutorService pool = Executors.newFixedThreadPool(THRED_NUM);
//如果有n个线程 就指定CountDownLatch的计数器为n
CountDownLatch countDownLatch = new CountDownLatch(THRED_NUM);
for (int i = 0; i < THRED_NUM; i++) {
pool.execute(()->{
try {
System.out.println("子线程:"+Thread.currentThread().getName()+"开始执行");
//模拟每个线程处理业务,耗时一秒钟
Thread.sleep(1000);
System.out.println("子线程:"+Thread.currentThread().getName()+"执行完成");
//当前线程调用此方法,则计数减1
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
}
//阻塞当前线程,直到计数器的值为0,主线程才开始处理
try {
countDownLatch.await();
System.out.println("等待子线程完成:,主线程"+Thread.currentThread().getName()+"开始执行,此时countDownLatch的计数器为0");
} catch (Exception e) {
e.printStackTrace();
}
//销毁线程池
pool.shutdown();
}
}

运行结果:

 注意:主线程的输出语句是在子线程结束之后再进行输出。

4 解析

CountDownLatch通过内部类Sync来实现同步语义,而Sync又继承了AQS。


private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
// 设置同步状态的值
Sync(int count) {
setState(count);
}
// 获取同步状态的值
int getCount() {
return getState();
}
// 尝试获取同步状态,只有同步状态的值为0的时候才成功
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 尝试释放同步状态,每次释放通过CAS将同步状态的值减1
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// 如果同步状态的值已经是0了,不要再释放同步状态了,也不要减1了
if (c == 0)
return false;
// 减1
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

4.1 countDown()

countDown()的源码如下:


public void countDown() {
sync.releaseShared(1);
}

调用的是AQSreleaseShared(int arg)方法:


public final boolean releaseShared(int arg) {
// 尝试释放同步状态
if (tryReleaseShared(arg)) {
// 如果成功,进入自旋,尝试唤醒同步队列中头结点的后继节点
doReleaseShared();
return true;
}
return false;
}

通过tryReleaseShared(arg)尝试释放同步状态,具体的实现被Sync重写了,源码:


protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 同步状态值减1
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

如果同步状态值减到0,则释放成功,进入自旋,尝试唤醒同步队列中头结点的后继节点,调用的是AQSdoReleaseShared()函数:


private void doReleaseShared() {
for (;;) {
// 获取头结点
Node h = head;
if (h != null && h != tail) {
// 获取头结点的状态
int ws = h.waitStatus;
// 如果是SIGNAL,尝试唤醒后继节点
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue;
// loop to recheck cases
// 唤醒头结点的后继节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
// loop on failed CAS
}
if (h == head)
// loop if head changed
break;
}
}

这里调用了unparkSuccessor(h)去唤醒头结点的后继节点。

4.2 await() 源码

await()源码如下:


public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

调用的是AQSacquireSharedInterruptibly(int arg)方法:


public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 如果被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取同步状态
if (tryAcquireShared(arg) < 0)
// 获取同步状态失败,自旋
doAcquireSharedInterruptibly(arg);
}

首先,通过tryAcquireShared(arg)尝试获取同步状态,具体的实现被Sync重写了,查看源码:


protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

如果同步状态的值为0,获取成功。这就是CountDownLatch的机制,尝试获取latch的线程只有当latch的值减到0的时候,才能获取成功。

如果获取失败,则会调用AQS的doAcquireSharedInterruptibly(int arg)函数自旋,尝试挂起当前线程:
 


private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 将当前线程加入同步队列的尾部
final Node node = addWaiter(Node.SHARED);
try {
// 自旋
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头结点,则尝试获取同步状态
if (p == head) {
// 当前节点尝试获取同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果获取成功,则设置当前节点为头结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
// 如果当前节点的前驱不是头结点,尝试挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

这里,调用shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() 挂起当前线程。

 参考文章:CountDownLatch详解_西瓜游侠的博客-CSDN博客_countdownlatch

最后

以上就是糊涂鼠标为你收集整理的CountDownLatch1  前言 2  常用方法3 示例4 解析的全部内容,希望文章能够帮你解决CountDownLatch1  前言 2  常用方法3 示例4 解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部