我是靠谱客的博主 着急眼睛,最近开发中收集的这篇文章主要介绍线程池ThreadPoolExecutor源码解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、简述

线程池维护着一定的线程数量,通过线程复用减少了线程频繁开启销毁的消耗。
当新任务提交到线程池时:
如果线程池中运行线程数量小于核心线程数量,启动新线程执行提交的任务。
如果线程池中运行线程数量大于或等于核心线程数量,且队列未满,将任务加入队列。此时再次检查,如果没有运行中的线程,启动新线程执行队列中的任务。
如果线程池中运行线程数量大于或等于核心线程数量,但小于最大线程数量,且队列已满,启动新线程执行队列中的任务。
如果线程池中运行线程数量等于最大线程数量,且队列已满,执行拒绝策略。

二、ThreadPoolExecutor属性

1. ctl&wc&rs

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

 // runState is stored in the high-order bits
 private static final int RUNNING    = -1 << COUNT_BITS;
 private static final int SHUTDOWN   =  0 << COUNT_BITS;
 private static final int STOP       =  1 << COUNT_BITS;
 private static final int TIDYING    =  2 << COUNT_BITS;
 private static final int TERMINATED =  3 << COUNT_BITS;

 // Packing and unpacking ctl
 private static int runStateOf(int c)     { return c & ~CAPACITY; }
 private static int workerCountOf(int c)  { return c & CAPACITY; }
 private static int ctlOf(int rs, int wc) { return rs | wc; }

wc:workCount,可以理解为workCount,最多为2^29 - 1。
rs:runState,表示允许状态,对应RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
ctl:线程池的状态标志,由wc和rs组合而成。

2. handler

private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

handler为RejectedExecutionHandler接口类型。
该接口只有1个方法:void rejectedExecution(Runnable r, ThreadPoolExecutor executor);用于表示拒绝策略,当线程池中的任务数量超限时,新任务提交后将进入拒绝策略的逻辑。
ThreadPoolExecutor提供了4个RejectedExecutionHandler的实现类,分别是:

(1) AbortPlicy:抛异常RejectedExecutionException

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

(2)DiscardPolicy:不做任何处理

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

(3)DiscardOldestPolicy:放弃队列中的第一个任务,新任务加入队列

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

(4)CallerRunsPolicy:直接执行新任务

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
   if (!e.isShutdown()) {
        r.run();
    }
}

3. workQueue

workQueue为BlockingQueue类型
当线程池中核心线程数小于任务数时,会将提交的任务保存在workQueue中。
在线程池中主要使用了以下3类BlockingQueue。

(1)有界队列

这一类队列中存放的任务数量有限制,因此称为有界队列。
常用的有界队列是ArrayBlockingQueue和PriorityBlockingQueue,前者适合于FIFO的执行顺序的场景,后者适合于按照优先级的执行顺序的场景。

(2)无界队列

这一列队列中可以存放任意数量的任务,因此称为无界队列。
有用的无界队列为LinkedBlockingDeque

(3)同步队列

主要指SynchronousQueue,不存放任何的任务。

4. threadFactory

threadFactory类型为ThreadFactory接口,是Thread的工厂类,用于根据runnable生成thread。接口中只有一个方法:Thread newThread(Runnable r);

(1)DefaultThreadFactory

线程池中最常使用的ThreadFactory实现类是Executors的静态内部类DefaultThreadFactory。

static class DefaultThreadFactory implements ThreadFactory {
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
   private final ThreadGroup group;
   private final AtomicInteger threadNumber = new AtomicInteger(1);
   private final String namePrefix;

   DefaultThreadFactory() {
       SecurityManager s = System.getSecurityManager();
       group = (s != null) ? s.getThreadGroup() :
                             Thread.currentThread().getThreadGroup();
       namePrefix = "pool-" +
                     poolNumber.getAndIncrement() +
                    "-thread-";
   }

   public Thread newThread(Runnable r) {
       Thread t = new Thread(group, r,
                             namePrefix + threadNumber.getAndIncrement(),
                             0);
       if (t.isDaemon())
           t.setDaemon(false);
       if (t.getPriority() != Thread.NORM_PRIORITY)
           t.setPriority(Thread.NORM_PRIORITY);
       return t;
   }
}

(2)Worker

threadFactory在线程池中的用处就是在Worker的构造器中。
Worker是对提交到线程池的任务的封装。

Worker(Runnable firstTask) {
   setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

state设置为-1是为了防止中断方法

void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

5. corePoolSize&maximumPoolSize

corePoolSize:核心线程数
maximunPoolSizw:最大线程数

6. keepAliveTime&allowCoreThreadTimeOut

keepAliveTime:线程的超时时间,单位纳秒,线程超时
allowCoreThreadTimeOut:是否允许核心线程超时,默认false。

三、ThreadPoolExecutor构造方法

ThreadPoolExecutor构造方法
ThreadPoolExecutor提供了4个构造方法,且都是前3个通过调用第4个构造方法进行实现。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

下面是构造方法的几个参数的介绍:
corePoolSize&maximumPoolSize:对应成员变量corePoolSize&maximumPoolSize
keepAliveTime&unit:超时时间值和时间单位,两者共同确定了成员变量keepAliveTime
workQueue:对应成员变量workQueue
threadFactory:对应成员变量threadFactory,构造方法1和2采用DefaultThreadFactory对象
handler:对应成员变量handler,构造方法1和3采用AbortPolicy对象。

四、ThreadPoolExecutor任务的执行

1. ThreadPoolEecutor#execute(Runnable)

ThreadPoolEecutor#execute(Runnable)负责新任务的执行策略。
execute方法是ThreadPoolExecutor的执行方法的入口,方法参数要求是Runnable类型,如果需要提交Callable任务,则需要通过submit提交任务。
sumbit有多个重载方法,可以接受Runnable和Callable两种类型的参数,submit方法会将参数转化为FutureTask对象,FutureTask类同时实现了Runnable和Callable接口,转化后再在方法中调用execute方法。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

按照前面介绍的提交线程任务的逻辑进行处理:
如果线程池中运行线程数量小于核心线程数量,启动新线程执行提交的任务。
如果线程池中运行线程数量大于或等于核心线程数量,且队列未满,将任务加入队列。此时再次检查,如果没有允许中的线程,启动新线程执行队列中的任务。
如果线程池中运行线程数量大于或等于核心线程数量,但小于最大线程数量,且队列已满,启动新线程执行队列中的任务。
如果线程池中运行线程数量等于最大线程数量,且队列已满,执行拒绝策略。
具体到代码,ThreadPoolExecutor的处理流程图如下:
ThreadPoolExecutor#execute(Runnable)流程图
在将任务加入队列后,再次检查了线程池状态和运行线程数
(1)如果线程池状态不在时Running,将任务从队列中移除,执行拒绝策略。确保线程池对新提交的任务的响应是正确的。
(2)如果线程池中没有运行中的线程(在任务加入队列的过程中所有任务全部执行完成),新建新线程,确保线程池队列中的任务能够得到执行。

2. ThreadPoolExecutor#addWorker(Runnable, boolean)

ThreadPoolExecutor#addWorker(Runnable, boolean)负责新建线程。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

该方法具有两个参数,第一个参数表示线程对应的任务,第二个表示新建的线程是否是核心线程。
从代码上可以看出,该方法可以分成两部分:
第一部分是一个双重循环,主要判断能否新建线程并修改ctl的值。
第二部分则通过lock锁尝试启动线程,并修改相关的标志变量。
(1)在第一部分中主要根据两个判断条件确定能否新建线程:
(1.1)线程池的状态

if (rs >= SHUTDOWN &&
 ! (rs == SHUTDOWN &&
      firstTask == null &&
      ! workQueue.isEmpty()))
   return false;

把这个判断条件进行非运算,看一下可以启动新线程的状态,这样子更好理解这个判断条件

!(rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
= rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))

可以看出,这个判断条件对应前面对线程池状态的描述:
当状态为RUNNING时,线程池可以执行队列中的任务和新提交的任务。
当状态为SHUTDOWN时,线程池只处理队列中的任务,不处理新提交的任务。
其它状态时,队列中的任务和新提交的任务都不执行。
(1.2)运行线程数量

if (wc >= CAPACITY ||
  wc >= (core ? corePoolSize : maximumPoolSize))
  return false;

线程池中的线程数量不能超过对应的限制。

ctl记录了线程池中的运行线程数量,新建线程后由compareAndIncrementWorkerCount(c)方法进行增加,由于使用的是CAS方式,自然需要把它放在循环里。
如果线程池中仅仅运行线程数量发生了变化,执行内层循环即可;如果线程池状态也发生了变化,则需要从外层循环开始。
(2)在第二部分中负责线程的启动
程序中,将任务封装成Worker对象,通过lock锁保证线程安全,workerStarted和workerAdded两个布尔变量分别表示是否启动和是否添加成功。
由于只有addWorker方法才可以新建线程,而在第一步的双重循环中已经保证了线程数量的可靠性,因此在第二部分中只需要对线程池状态进行验证,不需要验证运行线程数量。
对于添加成功的线程,通过t.start();进行启动。
对于没有添加成功的线程,通过addWorkerFailed(w);进行回滚。

/**
  * Rolls back the worker thread creation.
  * - removes worker from workers, if present
  * - decrements worker count
  * - rechecks for termination, in case the existence of this
  *   worker was holding up termination
  */
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

在addWorkerFailed方法中,会馆主要有3步:
(1)任务从队列中移除
(2)减少ws
(3)尝试termanate线程池,在tryTerminate会检查当前线程池是否需要terminate

3. ThreadPoolExecutor#runWork(Worker)

在worker类中,其run方法委托ThreadPoolExecutor#runWork(Worker)方法,负责线程的启动与任务的执行。

final void runWorker(Worker w) {
     Thread wt = Thread.currentThread();
     Runnable task = w.firstTask;
     w.firstTask = null;
     w.unlock(); // allow interrupts
     boolean completedAbruptly = true;
     try {
         while (task != null || (task = getTask()) != null) {
             w.lock();
             // If pool is stopping, ensure thread is interrupted;
             // if not, ensure thread is not interrupted.  This
             // requires a recheck in second case to deal with
             // shutdownNow race while clearing interrupt
             if ((runStateAtLeast(ctl.get(), STOP) ||
                  (Thread.interrupted() &&
                   runStateAtLeast(ctl.get(), STOP))) &&
                 !wt.isInterrupted())
                 wt.interrupt();
             try {
                 beforeExecute(wt, task);
                 Throwable thrown = null;
                 try {
                     task.run();
                 } catch (RuntimeException x) {
                     thrown = x; throw x;
                 } catch (Error x) {
                     thrown = x; throw x;
                 } catch (Throwable x) {
                     thrown = x; throw new Error(x);
                 } finally {
                     afterExecute(task, thrown);
                 }
             } finally {
                 task = null;
                 w.completedTasks++;
                 w.unlock();
             }
         }
         completedAbruptly = false;
     } finally {
         processWorkerExit(w, completedAbruptly);
     }
 }

大致分为3部分
(1)初始化的准备工作:指for循环之前的代码块,主要是worker对象的复制并允许中断,completedAbruptly用来标志线程是正常完成任务结束的还是异常终止的。
(2)线程循环执行任务:指for循环的代码块,首先执行worker对象的任务,然后遍历获取队列中的任务进行执行。
循环体内单个任务的执行过程如下:

runWork方法循环内单个任务执行流程图
每个任务的处理过程中通过w.lock();w.unlock();保证执行过程中不被中断。在执行过程中通过判断语句if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();
确保了:

If pool is stopping, ensure thread is interrupted;
f not, ensure thread is not interrupted. This
requires a recheck in second case to deal with
shutdownNow race while clearing interrupt

另外ThreadPoolExecutor还提供了前置方法beforeExecute和后置方法afterExecute方法给子类进行扩展。
(3)收尾工作:指finally语句的processWorkerExit(w, completedAbruptly);,委托processWorkerExit进行处理,赋值最后的清理和记录。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

分成4部分
(3.1)在异常终止的情况下,减少ws数量,通过CAS保证线程安全。看到这里可能有疑问,正常完成的情况下不需要减少ws数量吗?其实也是需要的,只是不在这儿调用而已,具体可以在ThreadPollExecutor#getTask()方法中看到。
(3.2)记录完成的任务数量completedTaskCount ,通过lock锁保证线程安全。
(3.3)tryTerminate:前面已经简单介绍过,在满足判断条件的情况下进行terminate
(3.4)确保运行线程数量正确。
如果异常终止,新建线程替代。
如果正常结束,确认线程池中最少需要的线程数,判断当前运行数量是否满足要求,如果不满足则新建一个线程。

4. ThreadPollExecutor#getTask()

ThreadPollExecutor#getTask()负责从队列中获取下一个任务。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

以下几种情况:
(1)线程池状态不符合的情况下返回null

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    decrementWorkerCount();
    return null;
}

(2)从队列中获取,可能超时用poll,不会超时用task,返回获取到的任务。
只有当(wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())时返回null。保证如果队列不为空的情况下至少有一个运行线程。

可以看到,所有返回null的场景下都将ws的数量减1,因此在后续的processWorkerExit方法中正常结束的场景下不需要处理ws数量。

五、线程池的任务提交

ThreadPoolExecutor继承了AbstractExecutorService,除了可以调用Execute方法直接执行任务以外,还可以通过submit方法进行任务提交。
submit共有3个重载方法,

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

newTaskFor也有两个重载方法

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

可以看到,其主要逻辑就是生成一个Runnable和Callable的适配类对象,调用execute方法并返回生成的适配类对象。
在线程池中,为了兼容Runnbale和Callable,设计了一些适配类。
线程相关类图

六、线程池的状态及切换

1. 线程的状态

线程池的主要功能就是管理线程,因此先了解一下线程的状态。
线程状态图

2. 线程池的状态

前面已经简单提到过线程池的几个状态。
RUNNING:线程池的初始状态,可以提交任务,能够执行队列中的任务和新提交的任务。
SHUTDOWN:能够执行队列中的任务,不能提交新的任务。
STOP:会中断所有执行中的线程,不再执行任务。
TIDYING:所有的线程都终止后,线程池中没有任何任务,线程池进入TIDYING状态。
TERMINATED:线程池的终态。线程池状态图

3. 线程池的状态切换

3.1 ThreadPoolExecutor#shutdown()

该方法可以实现RUNNING >> SHUTDOWN的状态切换。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    	//1. 检查操作权限
        checkShutdownAccess();
        //2. 修改状态
        advanceRunState(SHUTDOWN);
        //3. 中断线程
        interruptIdleWorkers();
        //4. 空方法,给ScheduledThreadPoolExecutor预留的扩展方法
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
//cas方式修改状态值,状态值只能向上改变。
private void advanceRunState(int targetState) {
    for (;;) {
        int c = ctl.get();
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}
private void interruptIdleWorkers() {
   interruptIdleWorkers(false);
}
//通过 t.interrupt();终端线程,但是不保证停止线程。如果线程没有interrupt的检查和响应函数时无法停止
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

3.2 ThreadPoolExecutor#shutdownNow()

该方法可以实现RUNNING|SHUTDOWN >> STOP的状态切换

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    	//1. 检查操作权限
        checkShutdownAccess();
        //2. 修改状态
        advanceRunState(STOP);
        //3. 中断线程
        interruptWorkers();
        //4. 队列中移除并返回任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
//首先使用drainTo移除任务,如果无法移除,则通过遍历的remove进行移除
private List<Runnable> drainQueue() {
    BlockingQueue<Runnable> q = workQueue;
    ArrayList<Runnable> taskList = new ArrayList<Runnable>();
    q.drainTo(taskList);
    if (!q.isEmpty()) {
        for (Runnable r : q.toArray(new Runnable[0])) {
            if (q.remove(r))
                taskList.add(r);
        }
    }
    return taskList;
}

3.3 ThreadPoolExecutor#tryTerminate()

该方法包括了STOP|SHUTDOWN >> TIDYING >> TERMINATED的两步状态切换。
在前面的分析中,我们已经看到不少的方法里调用的tryTerminate方法。这是因为TIDYING和TERMINATED并不能像SHUTDOWN和STOP一样直接通过一个对外开放的api进行状态的切换,所以需要在执行过程了进行检查是否需要进行状态切换。只有原状态为STOP|SHUTDOWN,且没有线程和任务时才能进行切换。

final void tryTerminate() {
     for (;;) {
         int c = ctl.get();
         if (isRunning(c) ||
             runStateAtLeast(c, TIDYING) ||
             (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
             return;
         if (workerCountOf(c) != 0) { // Eligible to terminate
             interruptIdleWorkers(ONLY_ONE);
             return;
         }

         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                 try {
                     terminated();
                 } finally {
                     ctl.set(ctlOf(TERMINATED, 0));
                     termination.signalAll();
                 }
                 return;
             }
         } finally {
             mainLock.unlock();
         }
         // else retry on failed CAS
     }
 }

terminated()也是一个留给子类扩展的钩子函数,ThreadPoolExecutor本身没有任何操作,执行完钩子函数后,进行TIDYING >> TERMINATED的切换。

七、线程数量的选择

在线程池中,一个合理的核心线程数和最大线程数的选择是保证并发效率的重要因素,一般来说,有这么一个经验公式:
线程数 = CPU数量 * 计划的CPU使用率 * (1 + 线程的CPU等待时间 / CPU使用时间)。
线程的CPU等待时间越长,可以设计更多的线程数。
因此CPU密集型的,线程数设计的要少。CPU不密集的,譬如IO密集的,线程数可以设计的大一些。
简单点说
(1)并发高,执行快:可以简单的理解为CPU不需要等待,线程数 = CPU数 + 1,减少线程上下文的切换。
(2)并发不高,执行慢:
(2.1)执行时间集中在CPU操作上,线程数需要设计的少一些,减少线程上下文的切换。
(2.2)执行时间集中在非CPU操作上(譬如IO),线程数可以设计的大一些,减少CPU的空闲时间。
(3)并发高,执行慢:不是简单的设计线程数就可以提高效率。

最后

以上就是着急眼睛为你收集整理的线程池ThreadPoolExecutor源码解析的全部内容,希望文章能够帮你解决线程池ThreadPoolExecutor源码解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(89)

评论列表共有 0 条评论

立即
投稿
返回
顶部