概述
1.CountDownLatch作用
CountDownLatch 是一个同步辅助类,即倒数计数器。在完成一组正在其他线程中执行的操作前,它允许一个或者多个线程处于等待状态,在当前计数到达零前,await方法会一直阻塞,之后会释放所有等待的线程, await的所有后续调用都会立即返回
CountDownLatch的主要方法如下所示,
public class CountDownLatch {
private final Sync sync;
//根据给定的大于0的计数初始化,
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//使当前线程在锁存器倒计时至零之前一直等待
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//使当前线程在锁存器倒计时至零之前一直等待,除非线程中断或者超过参数指定的时间
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//递减锁存器的技术, 如果为0,释放所有等待的线程
public void countDown() {
sync.releaseShared(1);
}
//返回当前记数
public long getCount() {
return sync.getCount();
}
//......
}
2.CountDownLatch使用场景
CountDownLatch在很多场景都适合使用,最常使用下面的两个场景
- 需要等待某个条件达到后才能做后面的事情;比如文件读取,几个线程分别读取文件,当所有线程都处理完后才表示文件读取完成, 才能继续做后面数据对比的事情
- 几个线程同时完成后才会触发某个事件; 比如分5个线程上传文件, 集水器为5,每个线程处理完后计数器减1,当计数器为0的时候表示所有线程都上传文件成功,
如下面的示例
- MyTask 类用于模拟任务,每个任务执行时间为一个3秒内的随机数,每个任务都共享同一个CountDownLatch 对象
- CountDownLatchTest 为测试类,计数器为10,循环执行10个任务,调用await()方法的时候,主线程等待,随着没完成一个任务计数器减一,当为0的时候,主线程继续执行
import java.util.Random;
import java.util.concurrent.CountDownLatch;
//任务
public class MyTask implements Runnable{
private final int count;
private final CountDownLatch doneLatch;
private static final Random randome=new Random(314159);
public MyTask(CountDownLatch downLatch,int i){
this.doneLatch=downLatch;
this.count =i;
}
public void run() {
doTask();
//任务完成计数-1
doneLatch.countDown();
}
private void doTask() {
String name=Thread.currentThread().getName();
System.out.println(name+":Mytask i="+ count);
try {
Thread.sleep(randome.nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+":Mytask end i="+ count);
}
}
import java.util.concurrent.*;
public class CountDownLatchTest {
private static final int TASKS=10;
public static void main(String[] args){
System.out.println("Begin");
ExecutorService service= Executors.newFixedThreadPool(5);
CountDownLatch doneLatch=new CountDownLatch(TASKS);
for (int t = 0; t < TASKS; t++) {
service.execute(new MyTask(doneLatch,t));
}
try {
//打印日志开始等待
System.out.println("waiting");
doneLatch.await();
//打印日志用于表示结束阻塞,并显示当前计数的数值
System.out.println("continue:"+doneLatch.getCount());
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
service.shutdown();
System.out.println("End");
}
}
}
执行结果:从结果上看, 当所有任务都执行完成后,计数的数值为0时,主线程才会继续执行
Begin
waiting
pool-1-thread-3:Mytask i=2
pool-1-thread-5:Mytask i=4
pool-1-thread-1:Mytask i=0
pool-1-thread-2:Mytask i=1
pool-1-thread-4:Mytask i=3
pool-1-thread-2:Mytask end i=1
pool-1-thread-2:Mytask i=5
pool-1-thread-5:Mytask end i=4
pool-1-thread-5:Mytask i=6
pool-1-thread-5:Mytask end i=6
pool-1-thread-5:Mytask i=7
pool-1-thread-2:Mytask end i=5
pool-1-thread-2:Mytask i=8
pool-1-thread-3:Mytask end i=2
pool-1-thread-3:Mytask i=9
pool-1-thread-5:Mytask end i=7
pool-1-thread-4:Mytask end i=3
pool-1-thread-1:Mytask end i=0
pool-1-thread-2:Mytask end i=8
pool-1-thread-3:Mytask end i=9
continue:0
End
3.CountDownLatch实现原理
3.1 AQS
从 CountDownLatch源码分析,CountDownLatch的核心实现机制是利用Sync对象的状态来实现的, AbstractQueuedSynchronizer 的简称为AQS,
public class CountDownLatch {
private final Sync sync;
//设置计数器初始的数值,并创建Sync对象
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
//设置状态的数值为计数器的数值
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
//覆盖AQS的释放状态,实现自己的逻辑,来消减count的数值,直到0
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
}
这里要注意一点CountDownLatch继承的父类是AbstractQueuedSynchronizer,此类被设计为大多数类型的同步器的父类,这些同步器依赖于单个原子int值来表示状态,子类可以重写等方法维护这个状态变量(statue),但只有使用getState、setState和compareAndSetState方法操作的原子更新的int类型的状态变量(statue)
父类AbstractQueuedSynchronizer属性state是通过valatile修饰,在多线程并发的情况下,某一个线程改变了任务的状态,其他线程都能够立马知道,用此方式保证了state字段的可见性
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//volatile 修饰,数值是线程见可见
private volatile int state;
//返回同步状态的当前值
protected final int getState() {
return state;
}
//设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//...
}
3.2 阻塞
了解了AbstractQueuedSynchronizer下面看看具体是如何实现的阻塞的,之前提到当调用CountDownLatch#await()方法开始阻塞,查看await方式是通过sync实现的,
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
从下方源码看acquireSharedInterruptibly方法可以被中断,所以方法中首先检查中断状态,然后调用 tryAcquireShared,此方法需AbstractQueuedSynchronizer子类自己实现,当返回值小于0的时候进入阻塞
从Sync源码可知Sync对象实现tryAcquireShared方法的处理逻辑为当状态值为0的时候返回1否则返回 -1,所以计数器的值到达0前,判断条件 -1<0 始终成立,线程都会调用方方法doAcquireSharedInterruptibly(arg),进入阻塞队列
private static final class Sync extends AbstractQueuedSynchronizer {
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
最后看doAcquireSharedInterruptibly实现源码
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//创建SHARED节点,并加入队尾,如果队列还没有初始化,那么先初始化队列
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 入队之后, 可能这个时候 计数器在该线程入队的过程已经被减到0了,
final Node p = node.predecessor();
if (p == head) {
// 第一次进来 ,先抢一次锁
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
//阻塞当前线程
//parkAndCheckInterrupt方法通过LockSupport.park禁用线程
//线程进入休眠状态
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
//如果被打断的话,会抛出异常,结束阻塞状态
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
3.3 唤醒
doAcquireSharedInterruptibly循环处理中使用会调用tryAcquireShared尝试释放,当计数器为0的时候,执行setHeadAndPropagate 设置队列头,并检查后续队列是否还需在共享模式下等待,如果需要在共享模式的释放动作,调用doReleaseShared发出后续信号并确保传播
private void doReleaseShared() {
for (;;) {
//头结点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒头结点的下一个节点,就是第一个被await方法阻塞住的线程节点
// 通过 LockSupport.unpark实现唤醒,如果线程在park上被阻塞,那么它将解除阻塞
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
这个时候,在之前代码阻塞住的现车干就会被唤醒, doAcquireSharedInterruptibly循环处理将所有阻塞的线程唤醒
上一篇:JAVA多线程FutureTask作用
最后
以上就是平淡毛衣为你收集整理的JAVA多线程同步计数器CountDownLatch作用的全部内容,希望文章能够帮你解决JAVA多线程同步计数器CountDownLatch作用所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复