我是靠谱客的博主 甜美白开水,最近开发中收集的这篇文章主要介绍07.CyclicBarrier源码解析CyclicBarrier源码解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

CyclicBarrier源码解析

属性说明与静态内部类

/*
 * 表示代这个概念
 */
private static class Generation {
    // 表示当前代是否被打破,如果被打破,那么再来到这一代的线程,就会直接抛出BrokenBarrierException异常
    // 且在这一代挂起的线程,都会被唤醒,然后抛出BrokenBarrierException
    boolean broken = false;
}
// 因为barrier的实现是依赖于Condition的所以需要ReentrantLock
private final ReentrantLock lock = new ReentrantLock();
// 线程挂起唤醒使用的条件队列,当前代所有线程到位这个条件队列内的线程就都会被唤醒
private final Condition trip = lock.newCondition();
// Barrier需要参与进来的线程数量
private final int parties;
// 就是最后一个线程来之后在唤醒所有线程之前执行的任务
private final Runnable barrierCommand;
// 表示当前代
private Generation generation = new Generation();
// 表示当前代还有多少个线程未就绪,初始值是parties
private int count;

非核心方法解析

构造方法

1.源代码

    public CyclicBarrier(int parties, Runnable barrierAction) {
        // 需要参与的线程数量不能小于等于0
        if (parties <= 0) throw new IllegalArgumentException();
        // 初始化parties
        this.parties = parties;
        // 初始化couunt为parties
        this.count = parties;
        // 初始化barrierCommand
        this.barrierCommand = barrierAction;
    }

2.入参说明

  • parties: 需要参与的线程数量
  • barrierAction: 当前代最后一个线程唤醒条件队列中所有线程之前要执行的任务可以为null

nextGeneration方法

1.源代码

    private void nextGeneration() {
        // 唤醒所有挂起的线程
        trip.signalAll();
        // 重置count为parties
        count = parties;
        // 开启新的一代
        generation = new Generation();
    }

2.方法说明

该方法用于开启下一代,当这一代所有线程到位后,会调用这个方法开启下一代,如果barrierCommand不为空还需要执行这个。

breakBarrier方法

1.源代码

    private void breakBarrier() {
        // 设置代的broken为true
        generation.broken = true;
        // 重置couunt为parties
        count = parties;
        // 唤醒条件队列中的所有线程
        trip.signalAll();
    }

2.方法说明

打破Barrier屏障,在屏障内的线程,都会抛出异常

核心方法解析

1.await流程

1.1:await方法

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); 
        }
    }

1.2:dowait方法

    /**
     * 入参说明:
     *      timed:调用了await方法的线程是否指定了超时
     *      nanos:超时时间
     * 返回值:返回当前的count
     *
     *
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        // 获取全局锁
        final ReentrantLock lock = this.lock;
        // 加锁,因为依赖与Condition所以需要加锁
        lock.lock();
        try {
            // 获取代
            final Generation g = generation;
            // 如果代已经被打破直接抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果当前先线程收到中断信号则执行breakBarrier方法和抛出中断异常
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            // 计算新的count
            int index = --count;
            // 条件成立:说明当前线程是最后一个到达Barrier的线程
            if (index == 0) {   
                // 标记:true表示最后一个线程执行cmd时候没有异常,false表示有异常
                boolean ranAction = false;
                try {
                    // 获取任务
                    final Runnable command = barrierCommand;
                    // 如果任务不为空则执行它
                    if (command != null)
                        command.run();
                    // 表示执行任务没有异常    
                    ranAction = true;
                    // 开启新一代
                    nextGeneration();
                    return 0;
                } finally {
                    // 如果执行任务出了异常那么去执行打破屏障的方法
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // 自旋,直到,条件满足,当前代被打破,线程被中断,等待超时
            for (;;) {
                try {
                    // 没有指定超时
                    if (!timed)
                        trip.await();
                    // 指定超时
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    /*
                     * 什么时候抛出中断异常:当前节点在条件队列没在同步队列的时候
                     * 条件成立:
                     *      1. 当前代没有变化
                     *      2. 如果当前代没有被打破
                     *      3. 总结就是如果是老的代并且没有被打破
                     */
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                /*
                 * 来到这里的情况:
                 *      1. 有其他线程执行了nextGeneration
                 *      2. 当前代被打破也会唤醒所有线程
                 *      3. 超时了
                 */
                
                // 当前代被打破
                if (g.broken)
                    throw new BrokenBarrierException();
                // 代更新了
                if (g != generation)
                    return index;
                // 超时了
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

最后

以上就是甜美白开水为你收集整理的07.CyclicBarrier源码解析CyclicBarrier源码解析的全部内容,希望文章能够帮你解决07.CyclicBarrier源码解析CyclicBarrier源码解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(51)

评论列表共有 0 条评论

立即
投稿
返回
顶部