概述
1、CyclicBarrier 简介
从字面上的意思可以知道,这个类的中文意思是“循环栅栏”。大概的意思就是一个可循环利用的屏障。
它的作用就是会让所有线程都等待完成后才会继续下一步行动。
举个例子,就像是旅游团,有的人参观完景点 先回到大巴车上,有的人可能晚点才回到大巴车,但只有人都到齐了,大巴车才能开往下一个景点。这里的人就是各个线程,大巴车就是 CyclicBarrier。
这个栅栏就是大巴车,各种颜色的先就是不同的线程代表人。不同的线程是不同时间到达的栅栏
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为0时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 |
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 | 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞 |
不可重复利用 | 可重复利用 |
2、入门案例
Demo1
现在有五个玩家(对应五个线程),只有所有玩家都加载好后(都调用 await方法),玩家们才会进入游戏
public class CyclicBarrierTest01 {
/**
* 案例:
* 模拟过气游戏 “王者荣耀” 游戏开始逻辑
*/
public static void main(String[] args) {
//第一步:定义玩家,定义5个
String[] heros = {"安琪拉","亚瑟","马超","张飞", "刘备"};
//第二步:创建固定线程数量的线程池,线程数量为5
ExecutorService service = Executors.newFixedThreadPool(5);
//第三步:创建barrier,parties 设置为5
CyclicBarrier barrier = new CyclicBarrier(5);
//第四步:通过for循环开启5任务,模拟开始游戏,传递给每个任务 英雄名称和barrier
for(int i = 0; i < 5; i++) {
service.execute(new Player(heros[i], barrier));
}
service.shutdown();
}
static class Player implements Runnable {
private String hero;
private CyclicBarrier barrier;
public Player(String hero, CyclicBarrier barrier) {
this.hero = hero;
this.barrier = barrier;
}
@Override
public void run() {
try {
//每个玩家加载进度不一样,这里使用随机数来模拟!
TimeUnit.SECONDS.sleep(new Random().nextInt(10));
System.out.println(hero + ":加载进度100%,等待其他玩家加载完成中...");
barrier.await();
System.out.println(hero + ":发现所有英雄加载完成,开始战斗吧!");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
执行结果:
安琪拉:加载进度100%,等待其他玩家加载完成中...
马超:加载进度100%,等待其他玩家加载完成中...
刘备:加载进度100%,等待其他玩家加载完成中...
张飞:加载进度100%,等待其他玩家加载完成中...
亚瑟:加载进度100%,等待其他玩家加载完成中...
亚瑟:发现所有英雄加载完成,开始战斗吧!
安琪拉:发现所有英雄加载完成,开始战斗吧!
马超:发现所有英雄加载完成,开始战斗吧!
张飞:发现所有英雄加载完成,开始战斗吧!
刘备:发现所有英雄加载完成,开始战斗吧!
3、CyclicBarrier源码分析
属性
//因为barrier 实现是依赖于 Condition 条件队列的,condition条件队列必须依赖Lock才能使用
private final ReentrantLock lock = new ReentrantLock();
//线程挂起实现使用的condition队列,条件:当前代所有线程到位,这个条件队列 才会被唤醒
private final Condition trip = lock.newCondition();
//Barrier 需要参与进来的线程数量
private final int parties;
//当前代,最后一个线程到位的线程 需要执行的事情
private final Runnable barrierCommand;
//表示Barrier对象 当前 “代”
private Generation generation = new Generation();
//表示当前“代”还有多少个线程未到位
//初始值就是parties
private int count;
构造方法
一共两个构造方法,一个参数的只指定 parties 值,parties 是参与线程的个数
第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
public CyclicBarrier(int parties) {
this(parties, null);
}
//Runnable 里面定义的方法会在最后一个线程到达后再执行,执行完成后,栅栏放开
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
内部类Generation “代”
表示 “代” 这个概念,就是栅栏
private static class Generation {
//表示当前"代"是否被打破(打破:所有线程都已经到达栅栏),如果代被打破,那么再来到这一代的线程,就会直接抛出BrokenException异常
//且在这一代 挂起的线程 都会被唤醒,然后抛出 BrokenException
boolean broken = false;
}
nextGeneration() 方法
开启下一代,当这一代 所有的线程到位后(假设barrierAction不为null,还需最后一个线程执行完事件才会调用这个方法), 会调用nextGeneration()开启新的一代
private void nextGeneration() {
//将在trip条件队列内挂起的线程 全部唤醒
// signal completion of last generation
trip.signalAll();
//重置count 为 parties
// set up next generation
count = parties;
//开启新的一代,使用新的Generation 对象,表示新的一代和上一代没有任何关系
generation = new Generation();
}
breakBarrier() 方法
打破barrier 屏障,在屏障内的线程 都会抛出异常
打破当前代,并唤醒其它节点,仅在持锁状态使用
private void breakBarrier() {
//将代中的broken设置为 true,表示这一代是被打破了的,再来到这一代的线程,直接抛出异常
generation.broken = true;
//重置count为parties
count = parties;
//将在trip条件队列内挂起的线程 全部唤醒,唤醒后的线程 会检查当前代 是否是打破的,
//如果是打破的话,接下来的逻辑 和 开启下一代 唤醒的逻辑不一样
trip.signalAll();
}
await() 方法
await 有两个方法,一个是无参的方法,另一个是超时方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
public int await(long timeout, TimeUnit unit) throws InterruptedException,
BrokenBarrierException, TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
dowait( boolean timed, long nanos ) 方法
await 的两个方法内部都调用的是这个方法
timed:表示当前调用await 方法的线程是否指定了 超时时长,如果true 表示线程是响应超时的
nanos:表示线程等待超时时长 纳秒,如果timed == false,nanos==0
-
非最后一个线程走下面的for循环,这个线程会 释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒,当它们被唤醒的时候已经更新换“代”了,这个时候返回index。
-
最后一个线程打破栅栏,调用nextGeneration() 方法,让所有条件队列中的线程 加入到 阻塞队列,等待被唤醒,进入下一代。
-
如果线程被中断标记 或 超时,则会走抛出异常的逻辑:先打破栅栏,再抛出对应的异常。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//获取barrier 全局对象
final ReentrantLock lock = this.lock;
//加锁
//为什么要加锁呢?
//因为barrier 的挂起 和 唤醒 依赖的组件是 condition
lock.lock();
try {
//获取barrier当前的"代"
final Generation g = generation;
//如果当前代是已经被打破状态,则当前调用await方法的线程,直接抛出Broken异常
if (g.broken)
throw new BrokenBarrierException();
//如果当前线程的中断标识位为true,则打破当前代,然后当前线程抛出中断异常
if (Thread.interrupted()) {
//1. 设置当前代的状态为broken状态
//2. 唤醒在trip 条件队列内的线程
breakBarrier();
throw new InterruptedException();
}
//执行到这里,说明当前线程中断状态是正常的 false,当前代的broken为 false,(未打破状态)
//正常逻辑..
//假设 parties 给的是 5,index对应的值就是 4,3,2,1,0
int index = --count;
//条件成立:说明当前线程是最后一个到达barrier的线程,此时需要做什么呢?
if (index == 0) { // tripped
//标记:true 表示 最后一个线程 执行cmd时未抛出异常; false:表示最后一个线程执行cmd时抛出异常
//cmd 就是创建 barrier对象时,指定的第二个Runnable接口实现,这个库为null
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
//条件成立:说明创建barrier对象时,指定Runnable接口了,这个时候最后一个到达的线程 就需要执行这个接口
if (command != null)
command.run();
//command.run() 未抛出异常的话,那么线程会执行到这里、
ranAction = true;
//开启新的一代
//1.唤醒trip条件队列内挂起的线程,被唤醒的线程 会依次获取到lock,然后依次退出await方法
//2.重置count 为parties
//3.创建一个新的generation对象,表示新的一代
nextGeneration();
//返回0,因为当前线程是此代最后一个到达的线程,所以index==0
return 0;
} finally {
//command.run() 执行抛出异常的话,那么线程会执行到这里、
if (!ranAction)
breakBarrier();
}
}
//执行到这里,说明当前线程并不是最后一个到达Barrier的线程..此时需要进入一个自旋中
// loop until tripped, broken, interrupted, or timed out
//自旋,一直到 条件满足,当前代被打破,线程被中断,等待超时
for (;;) {
try {
//条件成立:说明当前线程是不指定超时时间的
if (!timed)
//当前线程 会 释放掉lock,然后进入到trip条件队列的尾部,然后挂起自己,等待被唤醒
trip.await();
else if (nanos > 0L)
//说明当前线程调用await方法时,是指定了超时时间的!
//超过指定时间,从条件队列 转移到 阻塞队列
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//抛出中断异常,会进来这里
//说明时候会抛出InterruptedException 异常呢?
//Node节点 在条件队列内 时 收到中断信号时 会抛出中断异常!
//条件一成立:说明当前代并没有发生变化
//条件二:当前代如果没有被打破,那么当前线程就去打破,并且抛出异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//执行到else 有几种情况?
//1.代发生了变化,这个时候就不需要抛出中断异常了,因为 代已经更新了,这里唤醒后就走正常逻辑了。。只不过设置下 中断标记
//2.代没有发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候抛出 BrokenBarrierException异常,也记录下中断标记位
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
//唤醒后,执行到这里,有几种情况?
//1. 正常情况,当前Barrier 开启了新一代 (trip.signalAll())
//2. 当前Generation 被打破,此时也会唤醒所有在trip上挂起的线程
//3. 当前线程trip 中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒
//条件成立:当前代已经被打破
if (g.broken)
//线程唤醒后一次抛出BrokenBarrierException异常
throw new BrokenBarrierException();
//唤醒后,执行到这里,有几种情况?
//1. 正常情况,当前Barrier 开启了新一代 (trip.signalAll())
//3. 当前线程trip 中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒
//条件队列:说明当前线程挂起期间,最后一个线程到位了,然后触发了开启新的一代的逻辑,此时唤醒trip条件队列内的线程。
if (g != generation)
//返回当前线程的index
return index;
//唤醒后,执行到这里,有几种情况?
//3.当前线程trip中等待超时,然后主动转移到 阻塞队列 然后获取到锁 唤醒
if (timed && nanos <= 0L) {
//打破Barrier
breakBarrier();
//抛出超时异常
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
4、总结
最后
以上就是谨慎铅笔为你收集整理的AQS_CyclicBarrier源码解析1、CyclicBarrier 简介2、入门案例3、CyclicBarrier源码分析4、总结的全部内容,希望文章能够帮你解决AQS_CyclicBarrier源码解析1、CyclicBarrier 简介2、入门案例3、CyclicBarrier源码分析4、总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复