我是靠谱客的博主 坚强红酒,最近开发中收集的这篇文章主要介绍看了这篇文章,还不理解线程池执行流程,过来找我要钱,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

    • 为什么使用线程池
    • 线程池创建
    • 核心参数
    • 提交任务流程
    • 任务什么时候执行

为什么使用线程池

实际编程中,频繁创建和销毁线程开销很大,所以一般使用线程的方式是线程池。

很方便的,java给我们提供了现成的线程池创建函数ThreadPoolExecutor,这个创建函数也成了不少公司面试必考题,当然,要想彻底理清线程池执行过程,需要剖析源码,这里我们就来仔细分析分析。

线程池创建

首先是线程池创建函数。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以把线程池类比成一个小公司,公司有少量正式员工,执行平时的一些工作量。如果工作量太大,正式员工忙不过来,会雇佣部分外包人员。
原则上,如果任务量突然变多,先把任务临时缓存起来,等正式员工有空闲时,交由正式员工由处理(节约成本嘛)。如果缓存队列满了,这时候就要考虑找外包人员了。
公司总预算有限,所以正式员工数量是固定的,且雇佣的外包人员也有最大人数限制。
如果工作量变少,为了节约成本,就要释放部分外包人员。
如果工作量实在太大了,正式员工、外包人员也达到最大预算人数,且所有人都在拼命完成工作任务,这时候,就要拒绝一部分任务了。

核心参数

接下来这几个参数就好理解了。

  • corePoolSize: 核心线程数量,可以类比正式员工数量,常驻线程数量。
  • maximumPoolSize: 最大的线程数量,公司最多雇佣员工数量(包含外包人员)。常驻+临时线程数量。
  • workQueue:任务等待队列,所有的正式员工都在处理任务,再来任务就先放到队列吧,队列如果也满了,那就要找外包了。
  • keepAliveTime:非核心线程空闲时间,就是外包人员等了多久,如果还没有活干,就被解雇了。
  • threadFactory: 创建线程的工厂,在这个地方可以统一处理创建的线程的属性。比如每个员工的名字,工号不一致,方便区分和安排任务。
  • handler:线程池拒绝策,什么意思呢?就是当任务实在是太多,人也不够,需求池也排满了,还有任务咋办?默认是不处理,抛出异常告诉任务提交者,我这忙不过来了。

提交任务流程

我们使用线程池的时候,一般是直接调用execute,提交一个任务,对应到线程池里是怎么处理的呢?

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {  //关键步骤1
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {  //关键步骤2
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))  //关键步骤3
        reject(command);
}

首先得理解这个ctl是什么意思?

线程池里为了充分利用int型的每一位,使用一个AtomicInteger的ctl来记录线程池中线程的数量及当前线程池的状态。低29bit位表示线程池中线程的数量,高3bit位用来记录线程池的状态是RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED中的一种。

添加任务步骤

  • 获取线程池中线程数量。

  • 线程数量小于核心线程数吗。

    • 小于核心线程数,添加一个核心线程,添加成功的话直接返回。
    • 如果添加核心线程失败,因为是多线程同时执行,再获取一遍线程池中线程数量,继续下一步。
  • 线程数量大于等于核心线程数或上面添加核心线程失败。

    • 线程池还在运行且添加任务到任务队列成功。
      • 重新检查线程池是否还在运行
        • 线程池不在运行,且从任务队列删除任务成功,拒绝该任务。
        • 线程池在运行,但线程池中没有线程(核心线程数也可以设置成0),添加一个非核心线程。
      • 线程池不在运行或添加任务到任务队列失败
        • 尝试添加非核心线程去处理该任务。
        • 如果添加非核心线程失败,拒绝该任务。

整个过程有点绕,可以对比图形,再理解一遍。

在这里插入图片描述

添加任务最关键的函数就是addWorker。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

要理解这个问题,首先要知道java里的标签是怎么用的,有点类似goto的意思,可以参考:https://blog.csdn.net/chanllenge/article/details/90266538

addWorker的核心思想是

  • 添加线程时时,区分核心和非核心线程,并可指定该线程的第一个处理任务。
  • 判断线程池状态及工作队列,线程数量等参数的合法性。
  • 通过CAS自旋方式,增加线程数量。
  • 加悲观锁,并结合参数合法性,添加一个线程worker,到线程队列workers。
  • 线程添加成功且正常启动,返回true,其他情况,添加线程失败,移除该线程,并线程数量减1,返回false。

线程池中,通过一个set来存储所有的线程。

/**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 */
private final HashSet<Worker> workers = new HashSet<Worker>();

任务什么时候执行

线程添加成功之后,该线程首先会执行指定的第一个处理任务,然后从工作队列的队首依次取任务去执行。

源代码:

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

看到这里,线程池的创建和执行你理解了吗?欢迎留言讨论!

@博乐

最后

以上就是坚强红酒为你收集整理的看了这篇文章,还不理解线程池执行流程,过来找我要钱的全部内容,希望文章能够帮你解决看了这篇文章,还不理解线程池执行流程,过来找我要钱所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(75)

评论列表共有 0 条评论

立即
投稿
返回
顶部