ThreadPoolExecutor线程池详解与使用
一、ThreadPoolExecutor详解
阿里巴巴Java开发规范中,规定了线程池不允许使用Executors创建线程池,而是通过ThreadPoolExecutor
创建线程池,能够深入理解ThreadPoolExecutor相关参数的设置。
ThreadPoolExecutor构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
参数说明如下:
参数 | 说明 |
---|---|
int corePoolSize | 核心线程数,线程池中始终保留的线程(即使线程处于空闲状态)。 |
int maximumPoolSize | 线程池中最大线程数 |
long keepAliveTime | 线程数大于核心线程数时,空闲状态线程的存活时间 |
TimeUnit unit | 存活时间的单位 |
BlockingQueue<Runnable> workQueue | 在任务执行之前用来保存任务的队列。这个队列将只保存由execute方法提交的Runnable任务。 |
ThreadFactory threadFactory | 创建新线程时所使用的线程工厂 |
RejectedExecutionHandler handler | 当线程和阻塞队列已满时的拒绝策略。 |
1. corePoolSize && maximumPoolSize && poolSize
-
poolSize
:当前线程中线程数。 -
corePoolSize
:核心线程数。 -
maximumPoolSize
:最大线程数。
场景1:poolSize <corePoolSize
,workQueue未满,此时新提交一个任务,则会新增一个线程来执行任务。
场景2:poolSize>=corePoolSize
,workQueue未满,此时新提交一个任务,则会将任务存入阻塞队列中。
场景3:poolSize>=corePoolSize
,workQueue已满,poolSize < maximumPoolSize,此时新提交一个任务,则会新增一个线程来执行任务,当线程空闲后,根据keepAliveTime决定线程存活。
场景4:poolSize>=corePoolSize
,workQueue已满,poolSize = maximumPoolSize,此时新提交一个任务,则根据RejectedExecutionHandler拒绝策略决定如何处理提交的任务。
2. keepAliveTime && unit
空闲线程存活时间与时间单位。
unit单位:
TimeUnit.DAYS
:天TimeUnit.HOURS
:小时TimeUnit.MINUTES
:分钟TimeUnit.SECONDS
:秒TimeUnit.MILLISECONDS
:毫秒TimeUnit.NANOSECONDS
: 毫微秒TimeUnit.MICROSECONDS
:微秒
3. BlockingQueue workQueue
SynchronousQueue
:直接提交。该队列没有容量,总是将任务提交线程执行。ArrayBlockingQueue
:有界队列。若线程数等于最大线程数,且有界队列已满,则执行拒绝策略。LinkedBlockingQueue
:无界队列。除非系统资源耗尽,否则队列会无限添加。这种队列下,最大线程数将永不会到达。PriorityBlockingQueue
:特殊无界队列。可根据任务优先级顺序先后执行任务。
4. ThreadFactory threadFactory
线程工厂,用来创建线程。为了统一在创建线程时设置一些参数,如优先级、是否守护线程等,通过自定义线程工厂创建的线程能保证具有相同的特性。
1
2
3
4
5
6
7
8
9
10
11public class ThreadFactoryDemo implements ThreadFactory { private int id = 0; public Thread newThread(Runnable r){ Thread t = new Thread(r); t.setName("Thread-Name:" + id++); //设置线程为守护线程 t.setDaemon(true); return t; } }
5. RejectedExecutionHandler handler
在阻塞队列为有界队列时,当队列满了后,添加的任务就会存在问题,于是为了处理满队列情况下的任务有以下几种策略可选:
AbortPolicy
线程池默认拒绝策略。当队列满了,丢弃任务并抛出RejectedExecutionException
异常。
DiscardPolicy
AbortPolicy
的降级版本,当队列满了,丢弃任务不抛出异常。
DiscardOldestPolicy
当队列满了,丢弃最老的任务,腾出空间并尝试再次执行execute()
加入队列。
CallerRunsPolicy
当队列满了,不使用子线程执行任务,直接主线程执行.run()方法。
- 自定义
自定义拒绝策略:
1.创建一个类,实现RejectedExecutionHandler
接口。
2.重写rejectedExecution(Runnable r, ThreadPoolExecutor executor)
方法
3.rejectedExecution
方法内编写拒绝策略逻辑
二、ThreadPoolExecutor使用
ThreadPoolExecutor
的使用方法大致如下:
- 定义
ThreadPoolExecutor
- 在代码中执行线程池的
execute()
方法 execute()
方法传参为Runnable command
,可以定义匿名内部类或自定义一个实现Runnable
类处理
具体DEMO
接收来自外部请求的数据,客户需求:5s内响应内容,超时熔断。
剖析代码:
1.校验数据部分
2.校验通过进行数据存储
3.存储完毕,响应必要数据
分析代码:
剔除【数据存储】部分,必要的校验数据部分响应时间在:4669ms 1289ms 805ms 805ms 763ms,平均在1666ms。
优化计划:
将【数据存储】部分交由线程池处理,当【数据校验】通过则可将必要响应结果直接响应客户。
1. 定义ThreadPoolExecutor
1
2
3
4
5
6
7
8public class XXXServiceImpl implements XXXService { /** * 线程池(核心线程2,最大线程4,空闲线程存活时间0,时间单位SECONDS,阻塞队列ArrayBlockingQueue(20)) */ private static final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(2,4,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(20)); }
2. 在代码中执行线程池的execute方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class XXXServiceImpl implements XXXService{ @Override public Map doTask(XXObject object) { /* do something... 如必要数据校验等 */ String errorMsg = check(object); if(StringUtils.isNotEmpty(errorMsg)){ return ResultUtils.getFaildResultData(errorMsg); } //必要响应结果:订单号 String orderNo = createUUid(); object.setOrderNo(orderNo); //校验通过 使用多线程处理数据存储,直接返回响应 ReceiveMessageRunnable receiveMessageRunnable = new ReceiveMessageRunnable(object); threadPoolExecutor.execute(receiveMessageRunnable); return ResultUtils.getSuccessResultData(orderNo); } }
3. 定义任务类实现Runnable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20private class ReceiveMessageRunnable implements Runnable { private XXObject object; public ReceiveMessageRunnable(XXObject object) { this.object = object; } @Override public void run() { try{ /** * 执行数据存储 */ }catch (Exception e){ //出现异常,则将数据存入统一异常日志中 便于排查问题 insertException(object, e); } } }
这里有个优化点:数据存储可能有一定的可能存储失败,若一定时间大量数据存储失败,修复是个问题。因此可以使用MQ队列重试机制处理该情况。
最后
以上就是忧虑泥猴桃最近收集整理的关于ThreadPoolExecutor线程池详解与使用ThreadPoolExecutor线程池详解与使用的全部内容,更多相关ThreadPoolExecutor线程池详解与使用ThreadPoolExecutor线程池详解与使用内容请搜索靠谱客的其他文章。
发表评论 取消回复