我是靠谱客的博主 漂亮期待,这篇文章主要介绍有关线程池,你必须要知道的几个细节1、 线程池中线程的排队优先级2、 线程池中线程的执行优先级  3、 线程池中线程的复用4、 ThreadPoolExecutor 线程池runState和workerCount的存储5、 线程的状态6、 线程池的状态7、 线程池中线程是如何获取任务的,线程的数量是何时减少的,怎么减少的?,现在分享给大家,希望可以做个参考。

1、 线程池中线程的排队优先级

        线程池按以下行为执行任务

            1). 当线程数小于核心线程数时,创建线程。

            2). 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。

            3). 当线程数大于等于核心线程数,且任务队列已满

                - 若线程数小于最大线程数,创建线程

                - 若线程数等于最大线程数,抛出异常,拒绝任务

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int c = ctl.get(); //当工作线程数量小于核心线程数时,创建新线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);

 

           

addWorker方法

源码比较长,看起来比较唬人,其实就做了两件事。1)才用循环CAS操作来将线程数加1;2)新建一个线程并启用。

源码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private boolean addWorker(Runnable firstTask, boolean core) { //(1)循环CAS操作,将线程池中的线程数+1. retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //core true代表是往核心线程池中增加线程 false代表往最大线程池中增加线程 //线程数超标,不能再添加了,直接返回 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS修改clt的值+1,在线程池中为将要添加的线程流出空间,成功退出cas循环,失败继续 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果线程池的状态发生了变化回到retry外层循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //(2)新建线程,并加入到线程池workers中。 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //对workers操作要通过加锁来实现 final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //细化锁的力度,防止临界区过大,浪费时间 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int c = ctl.get(); int rs = runStateOf(c); //判断线程池的状态 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //判断添加的任务状态,如果已经开始丢出异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //将新建的线程加入到线程池中 workers.add(w); int s = workers.size(); //修正largestPoolSize的值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //线程添加线程池成功,则开启新创建的线程 if (workerAdded) { t.start();//(3) workerStarted = true; } } } finally { //线程添加线程池失败或者线程start失败,则需要调用addWorkerFailed函数, //如果添加成功则需要移除,并回复clt的值 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

2、 线程池中线程的执行优先级  

        core线程中的任务先执行,再者是非核心线程中的任务,当传入的任务是null时才从任务队列中获取任务。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

        addWorker会用当前task创建一个Worker对象,相当于对task的包装,然后用Worker对象作为task创建一个Thread,该Thread保存在Worker的thread成员变量中。在addWorker中启动了这个线程,线程中执行runWorker方法。

先看注释:

1.首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过getTask方法从workQueue中取任务。ThreadPoolExecutor的execute方法会在无法产生core线程的时候向workQueue队列中offer任务。

getTask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask方法结束,返回的是null,runWorker循环结束,执行processWorkerExit方法。

至此,该线程结束自己的使命,从线程池中“消失”。

2.在开始执行任务之前,会调用Worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearInterruptsForTaskRun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。

3.beforeExecute和afterExecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。

我们可以在beforeExecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedAbruptly的值是true,表示the worker died due to user exception,会用decrementWorkerCount调整wc。

4.因为Runnable的run方法不能抛出Throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterExecute中对异常做一些处理。

5.afterExecute方法也可能抛出异常,也可能使当前线程死掉。

3、 线程池中线程的复用

 

线程池的execute的作用是把任务放到等待队列中或者新建worker并把任务放到worker的firstTask,最后执行worker中的thread;

Worker中的thread的start方法会执行Worker的run方法;

Worker的run方法会调用线程池的runWorker方法;

runWorker方法则是调用worker的firstTask的run方法,达到目的;

好处就是可以重复利用Worker与Worker中的thread,这也是线程池的优势。

4、 ThreadPoolExecutor 线程池runState和workerCount的存储

         ThreadPoolExecutor是经常使用的线程池类。线程池有5种状态:RUNNING / SHUTDOWN / STOP / TIDYING / TERMINATED。这五种状态是如何存储的呢?通过阅读jdk源码发现,其内部使用了一个AtomicInteger字段同时存储了线程池状态和有效线程数量,实现很巧妙。

相关源代码如下(解释见注释):

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//ctl一个变量同时存储runState和workerCount,其中runState占用高3位,workCount占用低29位 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //workerCount使用的位数:32-3=29位 private static final int COUNT_BITS = Integer.SIZE - 3; //workerCount最大值:536870911,即0b00011111_11111111_11111111_11111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState存储在高位,占用3位 //0b11100000_00000000_00000000_00000000 private static final int RUNNING = -1 << COUNT_BITS; //0b00000000_00000000_00000000_00000000 private static final int SHUTDOWN = 0 << COUNT_BITS; //0b00100000_00000000_00000000_00000000 private static final int STOP = 1 << COUNT_BITS; //0b01000000_00000000_00000000_00000000 private static final int TIDYING = 2 << COUNT_BITS; //0b01100000_00000000_00000000_00000000 private static final int TERMINATED = 3 << COUNT_BITS; //~CAPACITY=0b11100000_00000000_00000000_00000000 // 获取runState,即保留ctl的高3位,后29位置0 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取workerCount,即保留ctl的低29位,高3位置0 private static int workerCountOf(int c) { return c & CAPACITY; } //设置ctl,或操作 private static int ctlOf(int rs, int wc) { return rs | wc; }

使用如下一段简单代码,验证一下相关值:

复制代码
1
2
3
4
5
6
7
8
9
10
11
public static void main( String[] args ) { ExecutorService pool = Executors.newSingleThreadExecutor(); Runnable r = new Runnable() { public void run() { System.out.print("running"); } }; pool.submit(r); pool.shutdown(); }

新建ThreadPoolExecutor时ctl值如图:

submit一个runnalbe以后的值如下图:

由于只使用了32位中的29位存储workerCount,而此值指的是线程池中允许开始运行但没有结束的线程数,这和整个线程池中的活动线程数并不相同。如果代码中会有超过5百多万线程(536870911)加入池中并允许开始执行,就要注意溢出的风险。

五种状态中,只有RUNNING状态时最高位为1,为负数,所以检查状态时通过判断ctl是否大于0,就可以知道是否处于此状态。其他4种状态最高位为0,都为正数。

5、 线程的状态

  1. new 新建一个状态值但还未启动。
  2. Runable Ruanable 包括了操作系统的线程状态中的running和ready,也就是处于此状态底下的线程有可能正在运行或者正在等在CPU分配时间片。
  3. waitting 无线等待期: 处于这种情况下的线程不会被cpu分配时间片,他们要等待其他线程的显式唤醒才可以。
  • 没有设置timeOut 参数的Object.wait()方法。
  • 没有设置TimeOut参数的Thread.join()方法。
  • LockSupport.park()方法。
  1. Timed waiting 限期等待: 处于这种状态的线程也不会被分配时间片。但是他无需其他线程唤醒,在一定的时间之后他们会被分配CPU执行的时间
  • Thread.sleep
  • Object.wait()设置了TimeOut
  • Thread.join()设置了TimeOut
  • LockSupport.parkNanous()
  • LockSupport.parkUtil()
  1. blocked阻塞: 线程被阻塞了,阻塞状态和等待状态的区别在于,阻塞是在等待获取到一个排他锁而等待是在等待另一个线程的唤醒或者等待一段时间。这种一般会在使用sychronnized的时候会发生
  2. Terminated结束: 已终止线程的线程状态,线程已结束执行。 上面的描述了6种但是其中有两种都是等待所以说是5种状态值。但是runable 也有区分两种 running 和 ready。所以也可说是6种吧(但是在深入理解java虚拟机一书中有描述是5种)

                                   

6、 线程池的状态

在java中线程池的实现的主类是通过ThreadPoolExcutor这个类来实现的, 线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量 (workerCount)。在具体实现中,线程池将运行状态(runState)、线程数量 (workerCount)两个关键参数的维护放在了一起,如下代码所示

复制代码
1
2
3
private static int runStateOf(int c) { return c & ~CAPACITY; } //计算当前运行状态 private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl

这篇文章主要说的是状态值所以说java线程池有一下几种状态值:

                                   

其生命周期转换如下入所示:

几种状态值的转换

                                     

7、 线程池中线程是如何获取任务的,线程的数量是何时减少的,怎么减少的?

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
1.介绍 getTask()方法是工作线程在while死循环中获取任务队列中的任务对象的方法 2.源码 private Runnable getTask() { // 记录上一次从队列中拉取的时候是否超时 boolean timedOut = false; // Did the last poll() time out? // 注意这是死循环 for (;;) { int c = ctl.get(); // Check if queue empty only if necessary. // 第一个if:如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况(或逻辑): // 1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法 // 2. 任务队列为空 // 如果在线程池至少为SHUTDOWN状态并且满足上面两个条件之一,则工作线程数wc减去1,然后直接返回null if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 跑到这里说明线程池还处于RUNNING状态,重新获取一次工作线程数 int wc = workerCountOf(c); // Are workers subject to culling? // timed临时变量勇于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取 // 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务 // 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 第二个if: // 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量 // 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null // 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null, // CAS把线程数减去1失败会进入下一轮循环做重试 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null) Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); // 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环 if (r != null) return r; // 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 3.分析 方法中,有两处十分庞大的if逻辑,对于第一处if可能导致工作线程数减去1直接返回null的场景有: 线程池状态为SHUTDOWN,一般是调用了shutdown()方法,并且任务队列为空。 线程池状态为STOP。 对于第二处if,逻辑有点复杂,先拆解一下: // 工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量 boolean b1 = wc > maximumPoolSize; // 允许线程超时同时上一轮通过poll()方法从任务队列中拉取任务为null boolean b2 = timed && timedOut; // 工作线程总数大于1 boolean b3 = wc > 1; // 任务队列为空 boolean b4 = workQueue.isEmpty(); boolean r = (b1 || b2) && (b3 || b4); if (r) { if (compareAndDecrementWorkerCount(c)){ return null; }else{ continue; } } 说明:这段逻辑大多数情况下是针对非核心线程。 在execute()方法中,当线程池总数已经超过了corePoolSize并且还小于maximumPoolSize时, 当任务队列已经满了的时候,会通过addWorker(task,false)添加非核心线程。 而这里的逻辑恰好类似于addWorker(task,false)的反向操作,用于减少非核心线程, 使得工作线程总数趋向于corePoolSize。如果对于非核心线程,上一轮循环获取任务对象为null, 这一轮循环很容易满足timed && timedOut为true, 这个时候getTask()返回null会导致Worker#runWorker()方法跳出死循环, 之后执行processWorkerExit()方法处理后续工作,而该非核心线程对应的Worker则变成“游离对象”, 等待被JVM回收。当allowCoreThreadTimeOut设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。 4.keepAliveTime的意义 当允许核心线程超时,也就是allowCoreThreadTimeOut设置为true的时候,此时keepAliveTime表示空闲的工作线程的存活周期。 默认情况下不允许核心线程超时,此时keepAliveTime表示空闲的非核心线程的存活周期。

 

最后

以上就是漂亮期待最近收集整理的关于有关线程池,你必须要知道的几个细节1、 线程池中线程的排队优先级2、 线程池中线程的执行优先级  3、 线程池中线程的复用4、 ThreadPoolExecutor 线程池runState和workerCount的存储5、 线程的状态6、 线程池的状态7、 线程池中线程是如何获取任务的,线程的数量是何时减少的,怎么减少的?的全部内容,更多相关有关线程池,你必须要知道的几个细节1、内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(78)

评论列表共有 0 条评论

立即
投稿
返回
顶部