我是靠谱客的博主 魁梧毛衣,最近开发中收集的这篇文章主要介绍ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

ThreadPoolExecutor

1 Java构建线程的方式

  1. 继承 Thread
  2. 实现 Runnable
  3. 实现 Callable
  4. 线程池方式

2 线程池的7个参数

int corePoolSize,	//核心线程数
int maximumPoolSize,	//最大线程数
long keepAliveTime,		//最大空闲时间
TimeUnit unit,			//时间单位
BlockingQueue<Runnable> workQueue,	//阻塞队列
ThreadFactory threadFactory,		//线程工厂
RejectedExecutionHandler handler	//拒绝策略

3 线程池的执行流程

在这里插入图片描述

4 线程池属性标识

4.1 线程池属性

// 是一个int类型的数值,表示:
//1.声明当前线程池的状态(5种) 
//	高3位:线程池状态 低29位:线程池线程个数
//2.声明线程池中的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

	//29,方便后面做位运算
    private static final int COUNT_BITS = Integer.SIZE - 3;	

	//通过运算得出	
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
	// 111 代表正常接收处理任务
    private static final int RUNNING    = -1 << COUNT_BITS;	

	// 000 代表不接受新任务,但内部还处理已有任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;	

	// executor.shutdownNow 001 不接受新任务,内部任务不再处理,同时中断当前任务
    private static final int STOP       =  1 << COUNT_BITS;	

	// 010 过渡状态,线程池即将over
    private static final int TIDYING    =  2 << COUNT_BITS;	

	// 011 代表线程池已经结束
    private static final int TERMINATED =  3 << COUNT_BITS;	

    // Packing and unpacking ctl
	// 得到线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }

	//得到线程池线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }	

4.2 线程池状态转换

在这里插入图片描述

5 execute方法执行

execute方法

public void execute(Runnable command){
        //健壮性
        if (command == null)
            throw new NullPointerException();
        // 拿到32位的int
        int c = ctl.get();
        //工作线程数 < 核心线程数,则创建核心线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            // 创建核心线程失败,重新获取ctl,32位的int
            c = ctl.get();
        }
        // 如果处于RUNNING状态,则添加任务到阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次获取 ctl
            int recheck = ctl.get();
            // 再次判断是否是RUNNING状态,不是则移除任务
            if (! isRunning(recheck) && remove(command))
                reject(command);    //拒绝策略
            //如果处于RUNNING状态,并且工作线程数为0
            else if (workerCountOf(recheck) == 0)
                //阻塞队列有任务但没有工作线程,则添加一个任务为空的工作线程处理阻塞队列中的任务
                addWorker(null, false);
        }
        // 不处于RUNNING状态则创建非核心线程处理任务
        else if (!addWorker(command, false))
            //创建失败则拒绝任务
            reject(command);
    }

addWorker方法

/**
     * 创建工作线程
     * @param firstTask 传入任务
     * @param core  需要创建的是否是核心线程
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 此for循环的目的是经过一些判断,给工作线程标识数+1
        for (;;) {
            // 获取32位int
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);
            /*  可能创建成功的条件:
               1.  RUNNING下可能创建成功
               2. SHUTDOWN状态下可以为阻塞队列任务创建工作线程
                    当传入firstTask为null时,会去处理阻塞队列的任务,
                    所以只有当 firstTask == null && !workQueue.isEmpty()时才会创建工作线程
             */
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                // 无需创建工作线程
                return false;

            for (;;) {
                // 获取工作线程数
                int wc = workerCountOf(c);
                /* 如果工作线程数大于线程池容量(固定容量)
                    或者需要创建核心线程,但核心线程数已满
                    或者工作线程数大于最大线程数(自定义,不超过容量)
                    则退出创建工作线程
                 */
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 工作线程数+1,CAS方式
                if (compareAndIncrementWorkerCount(c))
                    break retry;    //如果失败则结束外侧循环
                c = ctl.get();  // 重新获取ctl,32位int
                //重新获取的当前状态是否与之前获取的状态一致
                if (runStateOf(c) != rs)
                    //不一致则结束内侧循环,进行下一次外侧循环重新获取状态
                    continue retry; 
            }
        }

        boolean workerStarted = false;  // 表示工作线程还未启动
        boolean workerAdded = false;   // 表示工作线程还未添加成功
        Worker w = null;    // Worker是工作线程
        try {
            w = new Worker(firstTask);  // 创建工作线程
            final Thread t = w.thread;  // 获取到线程对象
            if (t != null) {    // 一般不会为null
                // 加锁是为了避免在创建过程中其它线程把线程池停掉
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get()); // 重新获取线程池状态

                    // 如果线程池是RUNNING状态或(SHUTDOWN状态且firstTask为空)
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // 检查线程t是否已经启动,应该还没有启动
                            throw new IllegalThreadStateException();
                        // 将工作线程添加到工作线程集合(HashSet集合)中
                        workers.add(w); 
                        int s = workers.size();
                        // 记录工作线程数量达到过的最大值
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 工作线程添加成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  // 启动线程
                    workerStarted = true;   // 线程启动成功
                }
            }
        } finally {
            if (! workerStarted)    // 启动失败则执行此方法
                addWorkerFailed(w);
        }
        return workerStarted;
    }

6 Worker方法

worker类实现了Runnable接口,实现了run方法

Worker()

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
public void run() {
            runWorker(this);
        }

runWork()

final void runWorker(ThreadPoolExecutor.Worker w) {
        // 获取当前线程
        Thread wt = Thread.currentThread();
        // 拿到任务
        Runnable task = w.firstTask;
        // AQS先不关注
        w.firstTask = null;
        w.unlock(); // allow interrupts
        // 标识为true
        boolean completedAbruptly = true;
        try {
            // 有任务则执行
            // 没任务则getTask()从阻塞队列获取任务,拿不到则阻塞在此,直到超时或核心线程一直阻塞
            while (task != null || (task = getTask()) != null) {
                w.lock();   // 加锁,避免shutdown任务中断,shutdown需要拿到全局锁
                if ((runStateAtLeast(ctl.get(), STOP) ||    // 当前状态大于等于STOP
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);    // 自定义前值增强
                    Throwable thrown = null;
                    try {
                        task.run(); // 执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);     // 自定义后置增强
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 线程执行之后的后续处理
            processWorkerExit(w, completedAbruptly);
        }

7. 线程池的创建方式

// 7 种创建发方式

//最基础的创建方式
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    1, //corePoolSize 核心线程数
    10, //maximumPoolSize 最大线程数
    10L, //keepAliveTime 空闲线程最大存活时间
    TimeUnit.MINUTES, //timeUtil 存活时间的时间单位
    new ArrayBlockingQueue<Runnable>(50), //workQueue 阻塞队列
    new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return null;
        }
    },
    new ThreadPoolExecutor.DiscardOldestPolicy() //handler 拒绝策略
);

//其余6种方式

//一个活动线程,无边界队列,任务一个一个排序执行
ExecutorService singlePool = Executors.newSingleThreadExecutor();
//缓存线程以便于重用,缓存空了则新建线程,线程空闲60s则踢出缓存,处理大量短时任务,消耗资源少
ExecutorService cachePool = Executors.newCachedThreadPool();
//创建指定线程数的线程池,定时执行任务
Executors.newScheduledThreadPool(4);
//保持nThread个活动线程,有线程退出则新建补齐,任务过多时多余的任务存阻塞队列
Executors.newFixedThreadPool(4);
//并行处理任务,不保证顺序,java8才加入此创建方法
Executors.newWorkStealingPool();
//创建单线程池,定时执行任务
Executors.newSingleThreadScheduledExecutor();

最后

以上就是魁梧毛衣为你收集整理的ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式的全部内容,希望文章能够帮你解决ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部