我是靠谱客的博主 现代小兔子,这篇文章主要介绍ThreadPoolExecutor线程池源码(二) - 核心源码,现在分享给大家,希望可以做个参考。

execute

源码就直接从execute方法开始看了,初始化的就跳过了

在这里插入图片描述

复制代码
1
2
java.util.concurrent.ThreadPoolExecutor#execute

execute是提交任务的方法,我觉得核心的逻辑就在这个方法中

  1. 在提交任务的时候,会判断当前线程池工作线程的数量是否小于corePoolSize,,如果小于,就添加核心线程

  2. 如果工作线程数量大于corePoolSize,或者是添加核心线程失败,就判断线程池是否是running状态,如果是,将任务添加到任务队列中
    2.1、添加到任务队列之后,再次判断线程池是否是running状态,如果是非running状态,就将任务从任务队列中remove,并执行拒绝策略
    2.2 如果依旧是running状态,就判断线程池工作线程数量是否为0,为0,就添加一个空任务,推进线程池的执行

  3. 如果添加任务队列失败,就尝试添加非核心线程,去处理

  4. 如果非核心线程添加失败,就执行拒绝策略

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); /** * 检查当前线程池中的线程数是否达到了常驻线程数,如果未达到,就添加一个核心线程,如果添加失败,执行下面的 * 如果添加成功,返回即可 * * addWorker方法的第二个参数,表示限制添加线程的数量是根据corePoolSize?还是maximumPoolSize来判断 * true:根据corePoolSize来判断,也就是说创建的是核心线程 * false:根据maximumPoolSize来判断,创建的是非核心线程 * * 核心的代码,就是addWorker这个方法 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; /** * 添加失败之后要重新获取当前线程池中的线程数量 * 失败的原因可能是:其他并发创建了线程,导致线程数量超过了核心线程数量 * 或者是线程池被关闭等 */ c = ctl.get(); } /** * 如果当前线程池处于运行状态,且将任务添加到了队列中 * * 如果当前线程不是运行状态,就不允许往队列中添加任务 */ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); /** * 1.判断当前线程池是否是running状态,如果不是运行状态,就remove刚才添加的任务 * 移除成功之后,执行拒绝策略 * 因为有可能在任务插入到队列的过程中,把线程池停掉了 * * 2.如果线程依旧是运行状态,就判断当前线程池中的线程的数量 * * 这种场景:有可能是设置允许核心线程超时,这样的话,核心线程在处理完所有的任务之后,就会销毁,也就是说:任务队列刚执行完,线程都已经销毁了,但是我又写入了一个任务 * * 如果线程池中核心线程和非核心线程的数量为0,就添加一个空任务到线程池中 * 这里添加一个空的任务,是为了让线程池去队列中取刚才放进去的任务执行,也就是启动一个线程,去执行getTask() */ if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } /** * 如果线程池是非运行状态,或者是队列已经满了,插入任务失败,就会把当前任务放入到线程池中,用非核心线程去执行任务 * 如果任务写入线程池失败,就执行拒绝策略 */ else if (!addWorker(command, false)) reject(command); }

在execute方法中,其实最核心的方法是addWorker()这个方法

addWorker方法,第一个参数是当前要提交的任务(也就是我们自己的业务逻辑),第二个参数是boolean类型的,为true,表示是核心线程,为false,表示是非核心线程
其实在线程池中,并不是说哪个线程创建了,就打标为核心线程,超过了corePoolSize之后,创建的线程都打标为非核心线程,在没有看源码之前,我一直认为是有一个标识来表示当前线程是核心还是非核心线程,其实是没有的
只是有一个线程数量的判断,我们可以认为,如果线程数量没有超过corePoolSize,创建的就都是核心线程,超过的,都是非核心线程,在源码中,是没有任何区别的

addWorker

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/** * 这是最核心的代码 * 可以简单的理解为分为了两大步骤 * 1.将核心线程数或者非核心线程数 + 1 * 2.添加一个worker对象,worker对象中维护着一个线程 * @param firstTask * @param core * @return */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { /** * 这里是一个死循环,如果不return或者break,会一直循环 * 这个死循环中,会判断两个内容: * 1.线程池状态 * 2.线程池中线程数量 * * 这里的c:是ctl对应的value * rs:是当前线程池的状态 */ int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 判断线程池状态和任务队列是否满足要求,不满足返回false,满足继续执行 * * 1.如果线程池状态大于running,且不是shutdown状态,就直接return,因为rs > shutdown的时候,是不允许添加任务的,也不会执行线程池中的任务 * 2.如果线程池状态大于running,且是shutdown状态,就需要再判断下,如果添加的任务不为null,就return false,因为shutdown状态不允许添加任务 * 如果是shutdown状态,且添加的任务是null,且任务队列为空,就不允许添加worker,因为此时添加一个空worker,是没有意义的 * 所以:如果是shutdown状态,且添加的任务是null,但是任务队列不为空,此时是可以继续添加worker的,此时添加的worker,就是启动一个线程,去执行任务队列中的任务 * * 所以,这个判断总结而言: * 1.如果rs > shutdown,就不允许添加 * 2.如果rs = shutdown,且任务队列不为空,且添加的worker中的任务是null,此时可以添加 * * 我们也可以认为:不进入这个if条件的,只有两种情况 * 1.线程池是running状态 * 2.线程池是shutdown状态,且任务队列不为空,且firstTask(新添加的任务)为null */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { /** * 获取到当前线程池中的工作线程 */ int wc = workerCountOf(c); /** * 判断当前线程池中的线程数量,是否超过了最大容量 * 或者是corePoolSize或者maximumPoolSize * 如果超过了,就返回false,表示添加失败 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /** * 这里是将线程池中的线程数量+1 * 如果cas失败,就继续循环 */ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { /** * 根据当前要提交的任务,声明一个worker对象 * worker对象是继承了AQS * 正常情况下,创建了worker对象之后,绑定的thread,是不为null的,然后此时会进行加锁 * 因为在往线程池中添加任务的时候,有可能是并发的 */ w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); /** * 这里还要对线程池状态做一次判断: * 1.如果线程池是running状态 * 2.线程池是shutdown状态,但是任务队列中有未执行完成的任务,且线程池中的线程数量为0 * 这两个条件,满足任意一个,就可以新建一个线程 */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); /** * 上面这个if应该就是判断线程池以及线程是否正常 * 如果正常,就把当前worker添加到一个set集合中 * largestPoolSize:应该是表示线程池中历史中出现的最大线程数量 */ workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } /** * 如果worker正常添加到了set中,那就会去执行worker的run方法 * 这里是启动了worker中的线程,worker中的线程,会去执行run方法 */ if (workerAdded) { t.start(); workerStarted = true; } } } finally { /** * 如果启动失败,就会执行下面的方法,将worker从worker集合中remove * 将线程池中线程数量 - 1 */ if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

在addWorker()方法中,有几个关键点
1、判断当前线程池状态是否允许添加任务,是否可以执行任务队列中的任务
2、判断当前线程池中工作线程数量是否超过了corePoolSize或者maximumPoolSize
3、然后将线程包装为worker对象
在这里插入图片描述

worker对象

我们在线程池中添加线程的时候,会将线程包装成分worker对象,这里根据我目前学习的知识,这里之所以包装为worker对象,是为了防止在初始化线程的时候,线程被中断,只有在开始准备执行任务的时候,会把线程设置为可中断

这里的是否可中断,是根据 AQS中的state变量来控制的

在这里插入图片描述
可以看到,worker对象继承了AQS对象,这里继承AQS,就是为了用state变量

初始化worker对象

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** * 这里要注意的是:我们一直所说的线程池中的线程,其实是包装在worker对象中,也就是说一个线程对应一个worker对象 * 那为什么要把线程包装在worker对象中,这里我觉得是为了控制线程在初始化的过程中,不可被中断,因为我发现,只有设置了一个state变量 * 这个变量就是来控制是否允许被中断的,在初始化的时候,设置为-1,如果此时来中断线程,就会加锁失败 * 如果初始化完成,开始执行业务逻辑代码,就把state变量设置为0,然后此时就可以中断了 * * * * 1.将aqs中的state变量设置为-1;这里设置为-1和线程池的中断有关系,设置为-1,表示当前线程池是不允许中断的 * 在后面开始执行程序员添加的任务时,会设置为允许中断,也就是将state设置为0 * 因为在尝试中断线程的时候,是会判断state是否 >= 0的,只有 >= 0时,才允许去调用interrupt方法 * * 2、从线程工厂中得到一个线程,绑定到worker中 * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }

执行提交的任务

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; /** * 这里的unlock,是将state设置为0,表示当前任务是可以中断的 */ w.unlock(); // allow interrupts boolean completedAbruptly = true; try { /** * 线程池在执行完一个任务之后,会接着从队列中获取其他任务进行执行,就是在下面这个while条件中实现的 * 下面的getTask()极速从队列中take任务,所以: * 当前线程在处理完任务之后,会接着从任务队列中获取任务执行 * * 这个while条件的意思是: * 如果程序员执行的task不为null,就运行程序员的任务 * 如果为null,就尝试从任务队列中获取排队的任务执行 */ while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /** * 如果线程池状态 >= stop需要中断线程 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { /** * 这里运行的就是程序员执行的任务方法 */ task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; /** * 将当前线程完成的任务数量 +1 */ w.completedTasks++; w.unlock(); } } /** * 如果代码跳出while循环,表示当前线程超时,或者是任务队列中没有线程了 * 就可以销毁当前线程了 */ completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

在前面addWorker()方法中,如果将新增的worker对象添加到集合中之后,会启动worker对象中的线程,也就是worker对象中的run()方法,run()方法就是调用runWorker()方法
在该方法中,主要有以下几个核心方法
1、getTask()从任务队列中获取排队的任务,在第一次执行runWorker()的时候,是不会调用该方法的,除非提交的task为null,正常情况下,提交的task是不为null的
2、task.run(),执行任务的具体业务逻辑
3、processWorkerExit(),在从任务队列中没有获取到任务的时候,是会执行该方法,销毁当前线程,并将worker对象销毁

getTask()

获取阻塞队列中的任务

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/** * 这是从任务队列中获取任务 * 结束该方法的运行有四种场景 * 1.线程池状态 >= STOP 或者线程池是SHUTDOWN 且任务队列为null,此时会将wc -1,返回null,结束运行 * 2.当前线程数量超过了最大的阈值 * 3.线程数量超过了corePoolSize,且超过指定的时间之后依旧没有获取到任务 * 4.正常获取到了队列中的任务,此时就返回任务,执行该任务方法 * * 关于第一点:在stop以及之上的状态,是不会执行任务队列中的任务的,这是线程池的规定 * 如果是shutdown状态,还是可以执行任务队列中任务,但是不会新增任务,此时如果任务队列为null,就返回即可 * 关于第二点和第三点,有一个共同的前提: * 任务队列为空 或者 线程数量 > 1 * * @return */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /** * 判断当前线程池状态: * 如果线程池状态是大于shutdown,而且任务队列为空 * 或者线程池状态是大于stop的,就将worker集合中的worker进行remove,因为 * 如果是大于shutdown状态,并且任务队列为空,此时就不需要worker了,因为此时不会再接收新的任务 * 如果是大于stop状态,无论任务队列是否为空,都不会执行,所以 remove即可 * 需要注意的是:这里只是将线程池中线程数量减1 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /** * allowCoreThreadTimeOut 默认为false * wc:表示当前线程池中工作的线程数量 * * 如果程序员设置了核心线程允许超时,或者是线程数量超过了corePoolSize,就把timed设置为true * 表示超时了 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 这里其实就是我们常说的:如果非核心线程空闲时间超过了设置的阈值,就会把非核心线程从线程池中remove的源码 * * timeOut表示线程已经超时了 * * 这里判断wc > 1我觉得是为了保证下面能够将数量 -1 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { /** * 在超时或者是任务队列为空的时候,表示当前线程可以推出了,就会把线程池中的工作线程数量 - 1 */ if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /** * 我们常说:非核心线程在空闲一定的事件之后,就会被回收 * 如果线程空闲时间超过了keepAliveTime,就会把timeOut设置为true,在上面的if判断中,就会将线程池中线程数 -1,然后return null * * 如果获取到的任务是null,就继续下一次循环,并将timeOut设置为true,表示已经超时 * 如果获取到的任务不是null,就return,执行任务 * * 这里会根据timed来判断是通过哪种方式获取任务 * 如果timed为true(wc > corePoolSize),就通过poll来获取任务,在指定时间没有获取到任务,就退出,将线程销毁 * 如果timed为false,表示此时线程数量 <= corePoolSize,就会一直阻塞,知道获取到任务为止 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

这个方法主要完成的就是两个操作:
1、如果正常获取到任务队列中的任务,就返回任务
2、如果没有获取到,或者线程池状态发生了变化,就将当前线程池中线程数量 - 1;并返回null

worker线程退出

java.util.concurrent.ThreadPoolExecutor#processWorkerExit

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/** * 这里是将线程从线程池中remove的逻辑 * 在前面getTask()的时候,会先把线程数量进行-1 * 如果getTask()返回了任务,就不会进入到该方法,而是去执行任务 * 如果getTask()返回了null,就会执行这里的逻辑,返回null表示需要剔除一个线程 * @param w * @param completedAbruptly */ private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { /** * 这里是将当前线程池已经执行的任务数量 + worker.completedTasks * completedTaskCount:记录线程池已经执行的任务数量 * completedTasks:当前worker中的线程已经执行了的线程数量 */ completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } /** * 尝试终止线程 */ tryTerminate(); int c = ctl.get(); /** * 这里要判断线程池状态是否小于stop * 如果小于stop,表示可以执行任务队列中的任务 * 说明上面tryTerminate没有将线程池停止 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { /** * 下面这几行代码:只要不进入到return,就会去添加一个空任务提交到线程池 * 这里addWorker其实就是让线程池继续去处理线程池或者任务队列中的任务 * 我们来看下什么场景下会执行addWorker?也就是wc < min * min有两种值(1或者corePoolSize) * 也就是说: * 如果当前线程池中的线程数量大于min(corePoolSize或者1),return即可,无需处理 * 如果不满足条件,就需要addWorker 添加一个空任务,触发getTask()的逻辑 * * 所以:就是下面两种场景需要添加一个空任务,去启动一个线程,执行任务队列中的任务 * 1.当前线程允许超时,且任务队列不为空,但是此时wc < 1 * 2.当前线程不允许超时,且wc < corePoolSize,此时就需要添加一个worker * * 其他情况下,就不需要再去创建新的线程了 */ int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }

这个方法中,只有一个点需要注意下:最后面一行代码,会添加一个空任务,添加空任务是为了让线程去执行任务队列中的排队任务

结论

对于线程池来说
1、核心线程和非核心线程没有什么区别,都会去任务队列中执行任务
2、线程池中的线程是包装在worker对象中,worker对象是为了保证在初始化线程的过程中不会被中断
3、worker对象继承了AQS,通过state来控制不允许被中断
4、在超过指定的时间之后,依旧没有获取到任务,就会把当前线程销毁

最后

以上就是现代小兔子最近收集整理的关于ThreadPoolExecutor线程池源码(二) - 核心源码的全部内容,更多相关ThreadPoolExecutor线程池源码(二)内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(87)

评论列表共有 0 条评论

立即
投稿
返回
顶部