概述
原文链接:Concurrent – 06 – CyclicBarrier源码解析
相关文章:
-
Concurrent – 01 – ThreadPoolExecutor源码解析
-
Concurrent – 02 – CAS源码解析
-
Concurrent – 03 – AQS源码解析
-
Concurrent – 04 – HashMap源码解析(JDK8)
-
Concurrent – 05 – CountDownLatch源码解析
-
Concurrent – 06 – CyclicBarrier源码解析
-
Concurrent – 07 – Semaphore源码解析
-
Concurrent – 08 – Exchanger用法解析
CyclicBarrier,即循环栅栏,是一个同步工具类,用于阻塞当前线程,等待其他线程,所有线程都必须同时到达栅栏位置后,才可以继续执行
当所有线程都到达栅栏位置后,可以触发执行一个预先设置好的任务
一、内部类解析
-
Generation
private static class Generation { // 标记当前代是否损坏 boolean broken = false; }
-
Generation 是 CyclicBarrier 的内部类,用于帮助 CyclicBarrier 进行代的控制
-
所谓代,可以这么理解,因为 CyclicBarrier 是可以重复使用的,所以当所有线程都通过了栅栏,就表示当前代已经过去了,进入了下一代,而下一代则会重新生成一个 Generation 对象,用于表示新的一代
-
二、字段解析
-
lock
private final ReentrantLock lock = new ReentrantLock();
- 通过栅栏所需要持有的锁
-
trip
private final Condition trip = lock.newCondition();
- 打开栅栏所需要的条件
-
parties
private final int parties;
- 通过栅栏的线程数
-
barrierCommand
private final Runnable barrierCommand;
- 额外任务 (当所有线程都到达栅栏处后,会触发执行)
-
generation
private Generation generation = new Generation();
- generation 是内部类 Generation 的一个实例
-
count
private int count;
-
计数器,在栅栏处等待的线程数
-
在每一代中,都会从 parties (通过栅栏的线程数) 递减至 0
-
当创建新的一代或者当前代破损时,会被重置为 parties (通过栅栏的线程数)
-
三、构造方法解析
-
CyclicBarrier(int parties, Runnable barrierAction)
public CyclicBarrier(int parties, Runnable barrierAction) { // 如果通过栅栏的线程数小于等于 0,则抛出异常 if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
-
CyclicBarrier(int parties)
public CyclicBarrier(int parties) { this(parties, null); }
四、方法解析
1、await() 相关方法
-
await()
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
-
阻塞当前线程,直到所有线程都到达栅栏处,才可以继续执行
-
返回当前线程到达栅栏处的索引,其中索引
getParties() - 1
表示第一个到达,索引 0 表示最后一个到达
-
-
dowait(boolean timed, long nanos)
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 上锁 lock.lock(); try { // 获取当前代 final Generation g = generation; // 如果当前代已破损,则抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果当前线程被中断 if (Thread.interrupted()) { /* * 将当前代损坏标志设为 true, * 并将 count 重置为 parties, * 同时唤醒所有在此 Condition 上注册的线程 */ breakBarrier(); // 抛出异常 throw new InterruptedException(); } // 获取线程索引值 int index = --count; /* * index == 0,表示所有线程都已经到达 * 栅栏处,此时栅栏可以开栅,让所有线程继续执行 */ if (index == 0) { // tripped // 标记任务是否正常执行 boolean ranAction = false; try { /* * 如果额外任务不为 null, * 则执行该额外任务 */ final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; /* * 唤醒所有在此 Condition 上注册的线程, * 并将 count 重置为 parties,同时生成下一代 */ nextGeneration(); return 0; } finally { // 如果任务没有正常执行 if (!ranAction) /* * 将当前代损坏标志设为 true, * 并将 count 重置为 parties, * 同时唤醒所有在此 Condition 上注册的线程 */ breakBarrier(); } } /* * 无限循环,直到栅栏开放、 * 或栅栏损坏、或线程中断、或超时 */ for (;;) { try { /* * 如果未设置超时,则阻塞当前线程 * (此处会调用 AQS 中的 await() 方法) */ if (!timed) trip.await(); /* * 如果设置了超时,则设置超时时间,并阻塞当前线程 * (此处会调用 AQS 中的 awaitNanos(long nanosTimeout) 方法) */ else if (nanos > 0L) nanos = trip.awaitNanos(nanos); // 如果当前线程被中断 } catch (InterruptedException ie) { // 如果当前线程属于代,且当前代未破损 if (g == generation && ! g.broken) { /* * 将当前代损坏标志设为 true, * 并将 count 重置为 parties, * 同时唤醒所有在此 Condition 上注册的线程 */ breakBarrier(); // 抛出异常 throw ie; /* * 如果当前线程不属于这一代, * 则不会影响这一代栅栏的执行逻辑, * 所以只要将当前线程标记为中断就好了 */ } else { Thread.currentThread().interrupt(); } } // 如果当前代已破损,则抛出异常 if (g.broken) throw new BrokenBarrierException(); /* * 正常换代,返回当前线程到达栅栏处的索引 * * 如果 g == generation,则说明还没有换代, * 但此时当前线程已被唤醒,这是因为一个线程可以使用多个栅栏, * 当别的栅栏唤醒了当前线程,流程也会走到这里,所以需要判断 * 是否为当前代,用于保证程序的正确性 */ if (g != generation) return index; // 如果设置了超时,且时间小于等于 0 if (timed && nanos <= 0L) { /* * 将当前代损坏标志设为 true, * 并将 count 重置为 parties, * 同时唤醒所有在此 Condition 上注册的线程 */ breakBarrier(); // 抛出异常 throw new TimeoutException(); } } } finally { // 解锁 lock.unlock(); } }
-
阻塞当前线程的核心方法
-
dowait 方法流程归纳
-
每个栅栏 (CyclicBarrier) 都会有一把锁 (ReentrantLock),栅栏 (CyclicBarrier) 上的所有线程,在调用
await()
方法时,都需要获得该栅栏的锁 -
线程每调用一次
await()
方法,count 计数器就会减 1,表示该线程已经到达栅栏处,并会调用 AQS 的await()
方法来阻塞该线程 -
如果在等待过程中,线程被中断,则抛出异常,这里需要注意的是,如果被中断的线程使用的栅栏 (CyclicBarrier) 不是当前代的 (如:在当前代最后一次线程调用
signalAll()
方法后,更新了代,在这个期间,该线程被中断,就会出现这种情况),程序会认为任务已经完成,不在乎线程被中断,只要将该线程标记为中断就好了 -
当 count 为 0 时,表示所有线程都已经到达栅栏处,此时会在所有线程中随机选取一个线程去执行在 CyclicBarrier 构造方法中设置的额外任务 (如果有的话),最后唤醒所有在此 Condition 上注册的线程,并将 count 重置为 parties,同时生成下一代
-
-
-
breakBarrier()
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
- 将当前代损坏标志设为 true,并将 count 重置为 parties,同时唤醒所有在此 Condition 上注册的线程
-
nextGeneration()
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
- 唤醒所有在此 Condition 上注册的线程,并将 count 重置为 parties,同时生成下一代
2、await(long timeout, TimeUnit unit) 相关方法
-
await(long timeout, TimeUnit unit)
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
- 阻塞当前线程,直到所有线程都到达栅栏处或超过了指定的等待时间,才可以继续执行
3、getParties() 相关方法
-
getParties()
public int getParties() { return parties; }
- 获取将到达栅栏的线程数
4、isBroken() 相关方法
-
isBroken()
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
- 查询当前代是否已破损
5、reset() 相关方法
-
reset()
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
- 重置当前栅栏,如果有线程正在栅栏处等待,则会抛出异常
6、getNumberWaiting() 相关方法
-
getNumberWaiting()
public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
- 获取当前在栅栏处等待的线程数
五、举例说明
-
CyclicBarrierTest.java
public class CyclicBarrierTest { private void go(CyclicBarrier cyclicBarrier) { // 创建三个工作线程,并依次启动 new Thread(new CyclicBarrierTask(cyclicBarrier), "Thread1").start(); new Thread(new CyclicBarrierTask(cyclicBarrier), "Thread2").start(); new Thread(new CyclicBarrierTask(cyclicBarrier), "Thread3").start(); } public static void main(String[] args) throws InterruptedException { CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> System.out.println("线程【" + Thread.currentThread().getName() + "】执行额外任务:【Hello World】")); new CyclicBarrierTest().go(cyclicBarrier); TimeUnit.SECONDS.sleep(1); System.out.println("================="); new CyclicBarrierTest().go(cyclicBarrier); // 线程【Thread1】已经达到 // 线程【Thread3】已经达到 // 线程【Thread2】已经达到 // 线程【Thread2】执行额外任务:【Hello World】 // 线程【Thread2】开始执行 // 线程【Thread1】开始执行 // 线程【Thread3】开始执行 // ================= // 线程【Thread1】已经达到 // 线程【Thread2】已经达到 // 线程【Thread3】已经达到 // 线程【Thread3】执行额外任务:【Hello World】 // 线程【Thread3】开始执行 // 线程【Thread1】开始执行 // 线程【Thread2】开始执行 } } class CyclicBarrierTask implements Runnable { private CyclicBarrier cyclicBarrier; public CyclicBarrierTask(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程【" + Thread.currentThread().getName() + "】已经达到"); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程【" + Thread.currentThread().getName() + "】开始执行"); } }
- 如上所示,调用
await()
方法会阻塞当前线程 (工作线程),使其在栅栏处一直处于等待状态,待所有工作线程都到达栅栏处 (此时计数器为 0),此时会在所有线程中随机选取一个线程去执行在 CyclicBarrier 构造方法中设置的额外任务 (如果有的话),最后恢复所有工作线程,使其继续执行
- 如上所示,调用
六、CountDownLatch 和 CyclicBarrier 的区别
-
CountDownLatch
-
只可以使用一次
-
让某个线程等待其他多个线程执行结束后 (锁存器计数为 0) ,再开始执行
-
-
CyclicBarrier
-
可以使用多次
-
多个线程之间互相等待,当所有线程都处于同一状态时 (都位于栅栏处),再开始执行
-
可以在栅栏处添加额外任务 (当所有线程到达栅栏处执行)
-
七、参考资料
-
并发编程之 CyclicBarrier 源码分析
-
Concurrent – 03 – AQS底层实现原理
最后
以上就是危机煎蛋为你收集整理的Concurrent -- 06 -- CyclicBarrier源码解析的全部内容,希望文章能够帮你解决Concurrent -- 06 -- CyclicBarrier源码解析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复