线程池-ThreadPoolExecute源码分析
java.util.concurrent.ThreadPoolExecutor 核心线程池,继承自AbstractExecutorService,实现了ExecutorService和Executor接口。
线程池的好处
- 降低性能消耗、提高响应速度:对于应用需要频繁创建线程,而且线程任务都比较简单,比如一些IO任务,线程的生命周期都很短;而线程的创建需要花费一定的CPU时间,所以当任务到来时如果线程已经准备就绪了,而不是重新创建,则会大大提高系统的响应速度。
- 对线程的集中管理监控:将创建的线程规约在线程池里,则可以对线程的数量和运行状态进行管理并进行监控,可对系统的线程资源进行集中管理。
线程池原理
源码API文档的描述如下
- Core and maximum pool sizes
ThreadPoolExecutor会根据corePoolSize以及maximumPoolSize的边界自动的调整线程池的大小。
1)当通过execute(Runnable)提交任务时,而且正在运行的线程数少于corePoolSize,即使其他线程处于空闲状态,也会创建一个新的线程执行这个任务;2)如果有大于corePoolSize但是小于maximumPoolSize数量的线程正在运行,则新提交的任务会放进workQueue进行任务缓存,但是如果workQueue已满,则会直接创建线程执行,但是如果创建的线程数大于maximum pool sizes的时候将拒绝任务。
3)** 当corePoolSize和maximumPoolSize 相等时则会创建固定数量的线程池
4)将maximumPoolSize 设置为无边界的**,比如整数的最大值,则意味着线程数和任务数量一致,也就没有等待的任务
5)corePoolSize、maximumPoolSize可以根据实际需求通过构造器设置,也可以动态的在运行时设置。
- On-demand construction
按照需求构造线程
1)默认情况下,每一个核心线程只有当有新任务到来时才会初始化创建,并执行
2)但是可以在运行时可以通过prestartCoreThread(一个coreThread)或者prestartAllCoreThreads(全部coreThread)来提前创建并运行指定的核心线程,这种需求适用于初始化线程池时,任务队列初始不为空的情况下。
- Creating new threads
创建线程
1)创建线程是通过ThreadFactory。除非特别的设定,否则默认使用Executors.defaultThreadFactory作为线程池,这个线程池创建的所有线程都有相同的线程组,线程优先级,非守护线程的标志
2)通过应用不同的线程池,可以更改线程的名字,线程组,优先级,守护标志等等
3)当通过newThread()调用线程池创建线程池失败时,返回null,此时执行器会继续运行,但是可能处理不了任何任务
4)线程需要处理"modifyThread" RuntimePermission,对线程修改进行运行时权限检查。如果使用这个线程池的工作线程或者其他线程没有处理这个认证"permission"则会使服务降级:对于线程池的所有设置都不会及时的生效,一个已经关闭的线程池可能还会处于一种线程池终止没有完成的状态
- Keep-alive times
非core线程的空闲存活时间
1)当这个线程池此时含有多余corePoolSize的线程存在,则多余的线程在空闲了超过keepAliveTime的时间将会被终止
2)这提供了一种减少空闲线程从而降低系统线程资源损耗的方法,还可以通过setKeepAliveTime进行动态设置
3)默认情况下,keep-alive policy只对超出corePoolSize的线程起作用,但是可以通过方法allowCoreThreadTimeOut(boolean)将空闲超时策略同样应用于coreThread,但是要保证超时时间不为0值
- Queue
任何BlockingQueue都可以被用来容纳和传递提交的任务
1)如果正在运行的线程小于corePoolSize,则executor会新增一个线程而不是将任务入队
2)如果正在运行的线程大于corePoolSize但是小于maximumPoolSize,executor则会将任务入队,而不是创建一个线程
3)如果任务不能入队(队列已满),则在没有超出maximumPoolSize的情况下创建一个新的线程,否则某种策略拒绝这个任务。
- three general strategies for queuing
三种入队策略
1)Direct handoffs:直接传递。比如 synchronousQueue,这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
2)Unbounded queues:无界队列。比如 没有指定容量的LinkedBlockingQueue,这将会使coreThread一直工作,而且由于任务总能入队,所以也不会创建其他超过corePoolSize的线程。用于所有任务完全独立,不相关,比如平滑瞬间高并发web页面的请求等,其实相当于异步框架了
3)Bounded queues:有界队列。 比如ArrayBlockingQueue,有助于在设置有限大的maximumPoolSizes时,阻止造成系统资源的枯竭。
队列大小和最大池大小可能需要相互折衷:使用大队列和小池最大限度地减少CPU的使用,操作系统资源,和上下文切换开销,但可能会导致人为的低吞吐量。如果任务经常被阻塞(例如,如果它们是I/O绑定),系统可能比你允许的时间安排更多线程的时间。使用小队列通常需要更大的池大小,这使得CPU繁忙,但可能会遇到不可接受的调度开销,这也降低吞吐量。
- Rejected tasks
当提交一个新任务时,如果Executor已经关闭或者有限的workQueue,maximumPoolSizes,并且他们已经饱和了,只要出现其中一种情况都会被拒绝。有四种已经定义的处理策略
1
2
3
4
5
6ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出 RejectedExecutionException异常。默认 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
还可以继承RejectedExecutionHandler自定义实现
- Hook methods
提供在每个任务执行时不同阶段执行不同的处理函数
- protected void beforeExecute(Thread t, Runnable r):优先使用指定的线程处理给定的任务,并在任务执行前做一些处理(如设置ThreadLocal变量或者记录一些日志等),t为执行r任务的线程,r为提交的任务。
- protected void afterExecute(Runnable r, Throwable t):任务执行完成时处理。r为执行完的任务,t为指定的造成任务终止的异常,如果设置为null则执行会正常完成,不会抛出异常
- protected void terminated():当Executor终止时,被调用一次
以上三个方法都为空方法,使用者自行实现。在进行多层嵌套时都要显示调用 super.method() 完成上层的处理函数。如果在调用方法时发生异常,则内部的工作线程可能会依次失败,突然终止。
可以继承ThreadPoolExecute,并实现上述几个Hook方法来检测线程池的状态,自定义自己的线程池,如监控任务的平均、最大、最小执行时间,来发现有没有一致阻塞的线程任务。
ThreadPoolExecutor源码详解
1-类继承结构
public class ThreadPoolExecutor extends AbstractExecutorService
ThreadPoolExecutor 继承自AbstractExecutorService
理解ThreadPoolExecutor需要先理解下面的这些参数
2-线程池状态runState与workerCount
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 接收新的任务,并且执行缓存任务队列中的任务 private static final int RUNNING = -1 << COUNT_BITS; // 不再接收新的任务,但是会执行缓存任务队列中的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新的任务,也不执行缓存队列中的任务,并且中断正在运行的任务 private static final int STOP = 1 << COUNT_BITS; // 所有的任务已经终止,workCount为0,这个状态为暂时状态,之后将调用terminated() hook method private static final int TIDYING = 2 << COUNT_BITS; // terminated()方法调用完成 private static final int TERMINATED = 3 << COUNT_BITS;
使用**AtomicInteger **的CAS机制来实现对运行时状态以及工作线程计数的并发一致性操作,低29位(32-3)用来保存workerCount,所以workerCount的最大为2^29 -1 。高3位用来保存runState,这样实现具有较高效率。
线程池状态迁移
3-其他成员
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34// 缓存任务阻塞队列 private final BlockingQueue<Runnable> workQueue; // 线程池主锁,用于访问worker线程集,还有其他关于线程池信息的记录信息(比如线程池大小,runState) private final ReentrantLock mainLock = new ReentrantLock(); // 工作线程集合,访问时需获取mainLock private final HashSet<Worker> workers = new HashSet<Worker>(); // mainLock上的终止条件量,用于支持awaitTermination private final Condition termination = mainLock.newCondition(); // 记录曾经创建的最大线程数,访问需获取mainLock private int largestPoolSize; // 对已经完成任务进行计数,只有在工作线程终止时才会更新,访问需要获取mainLock private long completedTaskCount; /** * 以下所有变量都为volatile类型的,以便能使所有操作都基于最新值 * (因为这些值都可以通过对应的set方法,在运行时动态设置), * 但是不需要获取锁,因为所有内部一致性不依赖这些参数的同步访问来保证 */ // 用于创建新线程的线程工厂 private volatile ThreadFactory threadFactory; // 任务拒绝策略 private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; // 设置默认任务拒绝策略 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 对于调用线程池的shutdown(),shutdownNow()方法权限认证 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
内部Worker类实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
轻量级独占重入锁,主要防止在执行任务期间被中断干扰。并实现了Runable接口,run方法代理到外层runwork方法主循环上。
4-四种构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59// Public constructors and methods // 1. 设置缓存任务队列 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { // 调用构造函数4 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } // 2. 设置缓存任务队列,线程工厂 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { // 调用构造函数4 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } // 3. 设置缓存任务队列,拒绝策略 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { // 调用构造函数4 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } // 4. 底层的构造函数,提供其他构造函数包装的基础 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
前三种构造方式简化了最后一种线程池配置,提供了一些默认设置
5-execute方法详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48public void execute(Runnable command) { if (command == null) // 空任务抛出异常 throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); // 1. 如果工作线程数小于核心线程数,则添加新的线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 添加成功则返回 c = ctl.get(); // 否则获取线程池状态 } // 2. 工作线程数大于等于核心线程数,则将任务放入缓存任务队列 // 与操作: 如果线程池正在运行,而且成功将任务插入缓存任务队列两个条件 // 都满足则进入条件语句内 if (isRunning(c) && workQueue.offer(command)) { // 第一次检查 int recheck = ctl.get(); // 如果不处于运行状态,则将任务从任务缓存队列移除 if (! isRunning(recheck) && remove(command)) // 第二次检查 reject(command); // 拒绝任务 else if (workerCountOf(recheck) == 0) // 第二次检查通过 addWorker(null, false); // 添加无初始任务的线程 } // 3. 任务入队失败,说明任务缓存任务队列已满,尝试添加新的线程处理 // 如果添加失败则以某种方式拒绝任务 else if (!addWorker(command, false)) reject(command); }
对线程池运行状态进行判断,并执行相应的控制策略
6-内部实现addWorker方法详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90private boolean addWorker(Runnable firstTask, boolean core) { // 对 runState进行循环获取和判断,如果不满足添加条件则返回false retry: for (;;) { // 获取runState,和workerCount int c = ctl.get(); int rs = runStateOf(c); // 对线程池状态进行判断,是否适合添加新的线程 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 进行CAS设置workerCount,失败重试 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 设置成功,则跳出最外层循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果workerCount没有设置成功,而且runState发生变化, // 则继续最外层的循环,对runState重新获取和判断 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 添加新线程需获取全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 获取锁后,对runState进行再次检查 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 启动线程 workerStarted = true; } } } finally { // 因为中途发生异常而没有让添加的线程启动,则回滚 if (! workerStarted) addWorkerFailed(w); // 线程回滚 } return workerStarted; } /** * Rolls back the worker thread creation. */ private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 对线程回滚需要获取全局锁 mainLock.lock(); try { if (w != null) workers.remove(w); // 从工作线程集合中移除添加失败的线程 decrementWorkerCount(); // 减少工作线程计数 // 因为中断异常而没有启动线程,从而回滚已入队的线程 // 这个中断异常可能是关闭线程池时发生的,所以应该将终止线程池的信号传播 tryTerminate(); // 尝试终止线程 } finally { mainLock.unlock(); } }
- 在外循环对运行状态进行判断,内循环通过CAS机制对workerCount进行增加,当设置成功,则跳出外循环,否则进行进行内循环重试
- 外循环之后,获取全局锁,再次对运行状态进行判断,符合条件则添加新的工作线程,并启动工作线程,如果在最后对添加线程没有开始运行(可能发生内存溢出,操作系统无法分配线程等等)则对添加操作进行回滚,移除之前添加的线程
这里提一下tryTerminate()方法,对当前情况的一种预判:执行上述操作可能是采用中断来关闭线程池的一种可能,所以得调用tryTerminate()方法传播关闭的信号,这个方法主要的功能就是中断等待在阻塞队列的Worker线程去检查是否线程池终止或者配置的变化,当线程池在终止之前或者工作线程为0时调用terminated()钩子方法,然后通知唤醒等待线程池终止的Caller线程。
7-工作线程主循环函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 将线程初始任务设为null w.unlock(); // 执行任务前,释放锁,允许被中断 boolean completedAbruptly = true; // 因为运行异常导致线程突然终止的标志 try { // 获取任务,如果没有任务可以获取,则此循环终止, // 这个工作线程将结束工作,等待被清理前的登记工作 while (task != null || (task = getTask()) != null) { w.lock(); // 执行任务之前获取工作线程锁 // 如果线程池关闭,则确保线程被中断 // 如果线程池没有关闭,则确保线程不被中断 // 这就要求在第二种情况下,进行重新检查,处理shutdownNow正在运行同时清除中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // before hook Throwable thrown = null; try { task.run(); // 运行任务 // 抛出异常,导致当前任务终止,工作线程退出 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); after hook } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // (可能是因为没有任务可做正常结束,或者产生异常而导致线程异常结束) // 登记信息,移除结束线程,然后根据情况添加新的线程等 processWorkerExit(w, completedAbruptly); } } // 处理线程退出函数 private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果是用户代码task.run()异常突然退出 decrementWorkerCount(); // 则减少 workerCount的计数 // 否则为正常退出,在getTask函数里已经进行了减少workerCount的操作 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 获取全局锁 try { completedTaskCount += w.completedTasks; // 登记信息 workers.remove(w); // 移除线程 } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 线程池是否处于运行状态 if (!completedAbruptly) { // 正常退出 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; // allowCoreThreadTimeOut 情况下需要的最少线程数 // 如果线程数量大于等于正常工作的数量则不再添加新的线程 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); // 添加worker线程 } }
-
Worker工作者类继承了AQS基于独占锁实现了Worker线程的同步语义,而没有通过ThreadPoolExecute的全局重入锁来保证同步,得到了性能上的优化,让锁的粒度更精细。Worker具有锁的语义是想让工作线程在没有正在执行任务时(特别是当从阻塞队列获取task时被阻塞)能够被在终止线程池时一定能感知外界的变化,所以采用了锁的语义,而且同时能保证任何外部因素不能打扰正在执行task的工作线程,除非调用中断线程的Thread.interrupt()方法能被task.run()方法内部响应,比如在run方法内检查中断标志,如果发生中断则抛出异常,则会导致工作线程退出。
-
通过循环,获取队列中的任务,在获取锁之后首先对线程池状态进行判断,并执行任务,在任务执行完毕则释放锁,所以在空闲时间,这个锁可以被其他方法获取,从而实现对空闲线程的中断,而对正在执行任务的线程则需要通过interrupt()方法来中断(详见shutdownNow()方法)
-
当无法通过getTask()获取任务时,或则执行任务期间发生异常(用户代码异常),则执行processWorkerExit()方法,移除已经结束或则突然死亡的工作线程,然后根据情况添加新的线程等。
-
需要注意的是:worker线程执行任务时获取的worker内部基于AQS同步器实现的锁,而不是全局锁,所以当线程池完成预热(线程数达到corePoolSize)时,执行任务不再需要获取全局锁,增加了任务的吞吐量,避免获取全局锁带来的性能瓶颈。
8-getTask 从缓存队列获取任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43private Runnable getTask() { boolean timedOut = false; // 记录上一次获取任务是否超时的标志 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 因为上述条件无法获取任务,则减少workerCount数量 return null; // 返回 null,导致工作线程主循环结束,并移除该工作线程 } int wc = workerCountOf(c); // 根据线程数量判断是否工作线程需要被移除 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 包含对超时的判断,如果发生超时,则说明该worker已经空闲了 // keepAliveTime时间,则应该返回null,这样会使工作线程正常结束, // 并被移除 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 超时获取任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 如果在keepAliveTime时间内获取到任务则返回 if (r != null) return r; // 否则将超时标志设置为true timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
- 通过死循环来对线程池状态进行判断,并获取任务,在超时发生之前发生中断则重置超时标志位false并进行重试,如果获取到任务则返回任务
- 主要来看一下是如何实现移除空闲keepAliveTime线程的:
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
方法从任务队列中定时获取任务,如果超时,则说明线程已经在等待了keepAliveTime都没有获得任务,则将超时标志设为true,在下一次循环时进行判断,如果发现上一次获取任务发生超时,则立刻返回null,这时worker线程主循环将正常结束,并移除结束的worker。
9-shutdown和shutdownNow以及awaitTermination方法详解
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 获取全局锁 try { checkShutdownAccess(); // 调用线程权限检查 advanceRunState(SHUTDOWN); // 将runState状态设为SHUTDOWN interruptIdleWorkers(); // 中断所有空闲状态的线程,通过获取Worker的锁来实现 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); // 尝试终止,执行terminated hook方法 } public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); //将runState状态设为SHUTDOWN interruptWorkers(); // 直接调用worker的中断方法,对所有worker进行中断 tasks = drainQueue(); // 队列 --> 集合 } finally { mainLock.unlock(); } tryTerminate(); return tasks; } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { // 如果所有提交的任务已经完成,则立刻返回true if (runStateAtLeast(ctl.get(), TERMINATED)) return true; // 已经超时,则返回false if (nanos <= 0) return false; // 进入等待,直到被通知、中断、超时,则返回剩余的间, // 如果返回值小于等于0,则表示是超时返回 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
总结:两个方法都是通过调用worker的interrupt来中断工作线程,但是线程如果没有响应中断(task.run()方法没有感知到中断的发生,而且也没有抛出异常),则任务不可能被立即终止。如果想要响应中断,可以在task.run()方法中设计成响应中断,然在下一次循环中判断中断标志来终止工作线程中。
-
shutdown方法将状态设置为SHUTDOWN,并拒绝新增任务,之前提交的任务如果还没有开始则不会被执行;调用isShutdown返回true,但是调用isTerminaed返回false,关闭所有空闲的线程,正在执行的线程将会将任务执行完成。
-
shutdownNow方法将状态设置为STOP,并拒绝新增任务,之前提交的任务如果还没有开始则不会被执行;调用isShutdown返回true,但是调用isTerminaed返回true,关闭所有空闲的线程和正在执行的线程,所以有的任务已经开始但是可能不会完成(将丢失部分任务),并将之前提交的没有开始执行的任务列表返回。
-
awaitTermination方法将状态设置为TERMINATED,并拒绝新增任务,调用isShutdown返回true,但是调用isTerminaed返回false,超时等待所有提交的任务的完成。
10- 方法列表总览
如何合理配置线程池大小?
Java线程底层映射到操作系统原生线程,而且Java在windows和linux平台下,一个Java线程映射为一个内核线程,而内核线程和CPU物理核心数一样,所以Java线程和CPU核心是一对一的关系,将线程池的工作线程设置为与物理核心相等能做到真正的线程并发,如果设置线程数多于核心则会在核心线程之间不停的切换。
一般需要根据任务的类型来配置线程池大小:
- 如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 Num(CPU+1)
- 如果是IO密集型任务,参考值可以设置为2Num(CPU)*
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
使用线程池的建议
- 建议使用有界队列,有界队列能增加系统的稳定性和预警能力,防止资源过度消耗,撑爆内存,使得系统崩溃不可用。
- 提交到线程池的task之间要尽量保证相互独立,不能存在相互依赖,否则可能会造成死锁等其他影响线程池执行的原因。
- 提交到的线程池的task不要又创建一个子线程执行别的任务,然后又将这个子线程任务提交到线程池,这样会造成混乱的依赖,最终导致线程池崩溃,最好将一个task用一个线程执行。
最后
以上就是健康夏天最近收集整理的关于全面分析线程池源码线程池-ThreadPoolExecute源码分析的全部内容,更多相关全面分析线程池源码线程池-ThreadPoolExecute源码分析内容请搜索靠谱客的其他文章。
发表评论 取消回复