我是靠谱客的博主 干净啤酒,这篇文章主要介绍并发编程 Executor、ThreadPoolExecutor线程池学习总结,现在分享给大家,希望可以做个参考。

线程

线程是调度CPU资源的最小单位,线程模型分为KLT模型与ULT模型,JVM使用的KLT模型,Java线程与OS线程保持1:1的映射关系。

线程的六种生命状态:
复制代码
1
2
3
4
5
6
7
NEW: 新建 RUNNABLE: 运行 BLOCKED: 阻塞 WAITING: 等待 TIMED_WAITING: 超时等待 TERMINATED: 终结

状态切换如下图所示:
请添加图片描述

线程的创建方式:Thread,Runnable,Callable

1、继承Thread类,重写run()方法,.start()启动线程。
2、实现Runnable接口,重写run()方法,.start()启动线程。
3、实现Callable接口和Future,重写call()方法,通过FutureTask futureTask = new FutureTask(new MyCallable());方式包装Callable线程对象,futureTask可以了解任务执行情况,可取消任务的执行,还可获取执行结果,futureTask.start()启动线程。

Runnable和Callable的区别:
1、Callable规定的方法是call(),Runnable规定的方法是run().
2、Callable的任务执行有返回值,而Runnable的任务没有返回值
3、call()可以抛出异常,run()不可以

线程池

介绍:

如果并发的请求数量非常多,但每个线程执行的时间很短,会频繁的创建和销毁线程,这样会大大降低系统的效率。可能服务器在为每个请求创建新线程和销毁线程上花费的时间消和耗的系统资源要比处理用户请求的时间和资源更多。
线程池通过对多个任务重用线程,解决了线程生命周期的开销和资源不足问题。
需要处理的任务数量很大、单个任务处理时间比较短时可以使用线程池。

线程池五种生命状态:
复制代码
1
2
3
4
5
6
RUNNING = -1 << COUNT_BITS; //高3位为111 SHUTDOWN = 0 << COUNT_BITS; //高3位为000 STOP = 1 << COUNT_BITS; //高3位为001 TIDYING = 2 << COUNT_BITS; //高3位为010 TERMINATED = 3 << COUNT_BITS; //高3位为011

1、RUNNING

复制代码
1
2
3
状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理 状态切换:线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0

2、 SHUTDOWN

复制代码
1
2
3
状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN

3、STOP

复制代码
1
2
3
状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且中断正在处理的任务 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP

4、TIDYING

复制代码
1
2
3
4
5
状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态 当线程池变为TIDYING状态时,会执行钩子函数terminated(),执行完之后变成TERMINATED状态 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING

5、 TERMINATED

复制代码
1
2
3
状态说明:线程池彻底终止,就变成TERMINATED状态。 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

Executor:

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
在这里插入图片描述
ThreadPoolExecutor 默认线程池
ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor

创建参数:

复制代码
1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数解释:
corePoolSize:核心线程数

复制代码
1
2
3
执行线程数超过corePoolSize后,继续提交的任务被保存到阻塞队列中,等待被执行; 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize:最大线程数

复制代码
1
2
如果当前阻塞队列满了,还在继续提交任务,会根据最大线程数创建新的线程任务

keepAliveTime:线程最大空闲时间

复制代码
1
2
3
当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交 核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;

unit:最大空闲时间单位(TimeUnit)

workQueue:保存等待执行的任务的阻塞队列

复制代码
1
2
3
4
5
6
任务必须实现Runable接口,在JDK中提供了如下阻塞队列: 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务; 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量高于ArrayBlockingQuene; 3、SynchronousQuene:不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量高于LinkedBlockingQuene; 4、priorityBlockingQuene:优先级最高的无界阻塞队列;

threadFactory:创建线程的工厂

复制代码
1
2
3
4
它是ThreadFactory类型的变量,用来创建新线程。 默认使用Executors.defaultThreadFactory() 来创建线程。 使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。

handler:拒绝策略

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
如果当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,需要采取一种策略处理该任务 线程池提供了4种策略: 1、AbortPolicy:直接抛出异常,默认策略; 2、CallerRunsPolicy:用调用者所在的线程来执行任务; 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务; 4、DiscardPolicy:直接丢弃任务; 上面的4种策略都是ThreadPoolExecutor的内部类。 也可以根据业务场景实现RejectedExecutionHandler接口,自定义策略 比如使用Redis缓存来存储不能处理的任务,用一个线程监控队列容量,少于一定比例后再拿来执行。
线程池监控方法:
复制代码
1
2
3
4
5
public long getTaskCount() //线程池已执行与未执行的任务总数 public long getCompletedTaskCount() //已完成的任务数 public int getPoolSize() //线程池当前的线程数 public int getActiveCount() //线程池中正在执行任务的线程数量
任务提交关闭方法:
复制代码
1
2
3
4
5
6
public void execute() //提交任务无返回值 public Future<?> submit() //用Future包装的提交方法,任务执行完成后有返回值 public void shutdown() //shutdown之后还能处理在队列中的任务 public List<Runnable> shutdownNow()//直接就将任务从队列中移除,线程池里的线程就不再处理了
execute执行流程图:

在这里插入图片描述

execute源码分析:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //clt记录着runState和workerCount int c = ctl.get(); //workerCountOf方法取出低29位的值(高3位用来记录线程状态),表示当前活动的线程数; //如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; //并把任务添加到该线程中。 if (workerCountOf(c) < corePoolSize) { //第二个参数表示是根据核心线程数来判断还是非核心线程数来判断; //如果为true,根据corePoolSize来判断; //如果为false,根据maximumPoolSize来判断 //创建核心线程任务 if (addWorker(command, true)) return; //如果添加失败,则重新获取ctl值 c = ctl.get(); } //如果当前线程池是运行状态 && 核心线程满了,添加到队列成功 if (isRunning(c) && workQueue.offer(command)) { //重新获取ctl值 int recheck = ctl.get(); //如果不是运行状态,由于之前已经把command添加到workQueue中了, //这时需要移除该command //执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回 if (! isRunning(recheck) && remove(command)) reject(command); //如果 else if (workerCountOf(recheck) == 0) //创建非核心线程任务 addWorker(null, false); } //核心线程满了,队列也满了,就创建非核心线程 else if (!addWorker(command, false)) reject(command); }
addWork方法:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //获取运行状态 int rs = runStateOf(c); //如果rs >= SHUTDOWN,则表示此时不再接收新任务 //接着判断以下3个条件,如果为false,表示队列中已经没有任务了,不需要再添加线程了 //1.rs == SHUTDOWN,表示关闭状态,不再接受新任务,但可以继续处理阻塞队列中的任务 //2.firsTask为空 //3.阻塞队列不为空 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取线程数 int wc = workerCountOf(c); //如果线程数超过CAPACITY,也就是ctl的低29位的最大值,返回false; //core是addWorker方法的第二个参数, //如果为true表示根据corePoolSize来比较 //如果为false则根据maximumPoolSize来比较。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //尝试增加workerCount,如果成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; //如果增加workerCount失败,则重新获取ctl的值 c = ctl.get(); //如果当前的运行状态不等于之前的运行状态,说明状态已改变,重新进行第一for循环 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //根据firstTask,通过ThreadFactory来创建Worker对象 w = new Worker(firstTask); //每一个Worker对象都会创建一个线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); //rs < SHUTDOWN表示是RUNNING状态; //如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 //因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //放到workers集合,workers是一个HashSet workers.add(w); int s = workers.size(); //largestPoolSize记录线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程,执行work的runWorker方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
runWorker方法:

通过循环+判断阻塞队列是否为空来保证线程可以重用

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //获取第一个任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); //是否因为异常退出循环 boolean completedAbruptly = true; try { //通过循环+条件判断来保证线程可以重用 //如果task为空,通过getTask来从队列获取任务 //getTask如果返回为空 while (task != null || (task = getTask()) != null) { //加锁,控制并发 w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //processWorkerExit会对异常和非异常的线程任务进行处理 //在线程池是RUNNING或者SHUTDOWN状态的前提下 //如果当前的工作线程由于抛出用户异常被终结,那么会新创建一个非核心线程执行此任务。 //如果当前的工作线程是正常情况下的终结,那么会有两种处理: //第一种:allowCoreThreadTimeOut为true,也就是允许核心线程超时的前提下 //如果任务队列空,则会通过创建一个非核心线程保持线程池中至少有一个工作线程。 //第二种:allowCoreThreadTimeOut为false,如果工作线程数大于corePoolSize直接返回 //否则创建一个非核心线程, //processWorkerExit()执行完毕之后,意味着该工作线程的生命周期已经完结。 processWorkerExit(w, completedAbruptly); } }
getTask方法:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private Runnable getTask() { //表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); //如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断: //1. rs >= STOP,线程池是否正在stop; //2. 阻塞队列是否为空。 //如果以上条件满足,则将workerCount减1并返回null。 //因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //timed用于判断是否需要进行超时控制。 //allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; //wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; //对于超过核心线程数量的线程,需要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法; //timed && timedOut,如果为true,表示需要进行超时控制,并且上次从队列中获取任务发生了超时 if ((wc > maximumPoolSize || (timed && timedOut)) //如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount-1; && (wc > 1 || workQueue.isEmpty())) { //如果-1失败,返回重试。 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制 //通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //如果r == null,说明已经超时,timedOut设置为true timedOut = true; } catch (InterruptedException retry) { //如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 timedOut = false; } } }

线程池运行原理:

复制代码
1
2
3
4
5
6
7
8
9
10
11
线程池通过execute方法提交任务时 1、首先会判断当前线程池的线程数是否小于核心线程数 如果小于,那么就直接通过ThreadFactory创建一个Worker对象来执行这个任务 如果大于,会尝试把任务放入阻塞队列中。 任务执行完之后,线程会去阻塞队列中获取任务。 2、随着任务越来越多,队列如果满了,就会判断当前线程池里的线程数是否小于最大线程数 如果小于,就创建非核心线程,此时就算队列是满的,也会先执行新提交的任务。 如果已经达到了最大线程数,此时就会执行拒绝策略来处理这个任务 默认是AbortPolicy,丢弃该任务,抛异常。 3、如果线程池中的非核心线程达到了最大空闲时间,还没有获取到任务,就会销毁

线程池复用原理:

复制代码
1
2
3
4
5
6
7
8
9
线程在线程池内部其实是被封装成一个Worker对象,Worker继承了AQS,有锁的特性 execute调用addWorker方法创建一个Worker对象,把线程和任务一起封装到Worker内部 再调用runWorker方法来让线程执行任务,runWorker内部使用了while死循环 当一个任务执行完之后,会不断地判断并通过getTask方法从阻塞队列中获取任务 只要能获取到任务,就会调用run方法,继续执行任务,这就是线程能够复用的主要原因。 但是如果从getTask获取不到任务的时候,就调用finally中的processWorkerExit方法,将线程销毁。 因为Worker继承了AQS,每次在执行任务前都会调用lock方法,执行完任务之后,调用unlock方法 这样可以从Woker的加锁状态判断出当前线程是否正在运行任务

线程超时销毁原理:

在getTask方法中,会根据timed判断当前获取任务的线程是否可以超时销毁:

复制代码
1
2
3
4
5
6
7
8
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时销毁; //wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; //如果有一个为true timed就为true 就表示可以超时销毁 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
复制代码
1
2
3
4
5
6
7
8
根据是否允许超时来选择调用阻塞队列workQueue的poll方法或者take方法 1、如果允许超时,会调用poll方法,传入空闲时间 如果线程达到了空闲还没有获取到任务,那么就会返回null,线程就会销毁。 2、如果不允许超时,会调用take方法,这个方法会一直阻塞获取任务,直到从队列中获取到任务为止。 3、因为allowCoreThreadTimeOut默认为false,如果线程数小于核心线程数,timed就会是false take方法又是阻塞的,所以核心线程进入到take中会一直被阻塞住,是不会被销毁的。 4、如果将allowCoreThreadTimeOut设置为true,所有线程走到这个timed都会是true,那么所有的线程,包括核心线程都可以做到超时销毁。

线程数量设置参考:
执行计算任务多的场景就是是CUP密集型:可以设置为CUP核数+1
IO多的场景,比如读文件、对数据库Redis的操作这种就是IO密集型:2✖️CPU核数+1
rocketmq、eureka、 nacos都是2✖️CPU的设置
设置为2✖️CPU后通过压测,可以得到最佳线程数 :
CPU核数 *((线程等待时间+线程执行时间) / 线程执行时间)

最后

以上就是干净啤酒最近收集整理的关于并发编程 Executor、ThreadPoolExecutor线程池学习总结的全部内容,更多相关并发编程内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(63)

评论列表共有 0 条评论

立即
投稿
返回
顶部