概述
介绍
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池的优势:
线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:
线程复用 ;控制最大并发数;管理线程
• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
架构
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
线程池ThreadPoolExecutor参数说明
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名 | 高 3 位 | 接收新任 务 | 处理阻塞队列任 务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余 任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列 任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入 终结 |
TERMINATED | 011 | - | - | 终结状态 |
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作
进行赋值
// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
常用参数(重点)
• corePoolSize线程池的核心线程数
• maximumPoolSize能容纳的最大线程数
• keepAliveTime空闲线程存活时间
• unit 存活的时间单位
• workQueue 存放提交但未执行任务的队列
• threadFactory 创建线程的工厂类
• handler 等待队列满后的拒绝策略
线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到
maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
拒绝策略(重点)
CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy: 直接丢弃,其他啥都没有,不做任何处理也不抛出异常,如果允许任务丢失,这是最好的一种策略。
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务(等待最久的任务),并将新任务加入队列中,尝试再次提交当前任务
一些框架也提供了实现
Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方
便定位问题
Netty 的实现,是创建一个新线程来执行任务
ActiveMQ 的实现,带超时等待(60s)尝试放入队列
PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
线程池的种类与创建
newCachedThreadPool(常用)
作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.
特点:
核心线程数为0
• 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
• 线程池中的线程可进行缓存重复利用和回收(回收默认时间为1分钟)
• 当线程池中,没有可用线程,会重新创建一个线程
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交
货)
创建方式:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景
newFixedThreadPool(常用)
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
特征:
• 线程池中的线程处于一定的量,可以很好的控制线程的并发量
• 线程可以重复被使用,在显示关闭之前,都将一直存在
• 超出一定量的线程被提交时候需在队列中等待
创建方式:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景
newSingleThreadExecutor(常用)
作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
特征: 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中以此执行,任务执行完毕,这唯一的线程
也不会被释放。
创建方式:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景
Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因
此不能调用 ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
newScheduleThreadPool(了解)
作用: 线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池
特征:
(1)线程池中具有指定数量的线程,即便是空线程也将保留 (2)可定时或者延迟执行线程活动
创建方式:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
场景: 适用于需要多个后台线程执行周期任务的场景
这四种线程池底层都是调用的ThreadPoolExecutor
newWorkStealingPool
jdk1.8提供的线程池,底层使用的是ForkJoinPool实现,创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu核数的线程来并行执行任务
创建方式:
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
场景: 适用于大耗时,可并行执行的场景
线程池提交任务
ExecutorService.java
// 执行任务
void execute (Runnable command);
// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交 tasks 中所有任务
<T> List < Future <T>> invokeAll(Collection < ? extends Callable<T>>tasks)
throws InterruptedException;
// 提交 tasks 中所有任务,带超时时间
<T> List < Future <T>> invokeAll(Collection < ? extends Callable<T>>tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection < ? extends Callable<T>>tasks)
throws InterruptedException, ExecutionException;
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection < ? extends Callable<T>>tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
线程池的关闭
shutdown
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}
shutdownNow
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}
其它方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
线程池入门案例
场景: 火车站3个售票口, 10个用户买票
newFixedThreadPool
package com.dongguo.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Dongguo
* @date 2021/8/24 0024-21:37
* @description:
*/
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//3个窗口
ExecutorService pool1 = Executors.newFixedThreadPool(3);
//10个顾客请求=
try {
for (int i = 1; i <=10 ; i++) {
pool1.execute(()->{
System.out.println(Thread.currentThread().getName()+"办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool1.shutdown();
}
}
}
运行结果:
pool-1-thread-2办理业务
pool-1-thread-3办理业务
pool-1-thread-1办理业务
pool-1-thread-2办理业务
pool-1-thread-3办理业务
pool-1-thread-3办理业务
pool-1-thread-3办理业务
pool-1-thread-3办理业务
pool-1-thread-2办理业务
pool-1-thread-1办理业务
newSingleThreadExecutor
package com.dongguo.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Dongguo
* @date 2021/8/24 0024-21:37
* @description:
*/
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//3个窗口
// ExecutorService pool1 = Executors.newFixedThreadPool(3);
//1个窗口
ExecutorService pool2 = Executors.newSingleThreadExecutor();
//10个顾客请求=
try {
for (int i = 1; i <= 10; i++) {
pool2.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool2.shutdown();
}
}
}
运行结果:
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
pool-1-thread-1办理业务
newCachedThreadPool
package com.dongguo.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Dongguo
* @date 2021/8/24 0024-21:37
* @description:
*/
public class ThreadPoolDemo1 {
public static void main(String[] args) {
//3个窗口
// ExecutorService pool1 = Executors.newFixedThreadPool(3);
//1个窗口
// ExecutorService pool2 = Executors.newSingleThreadExecutor();
//可扩容的窗口
ExecutorService pool3 = Executors.newCachedThreadPool();
//10个顾客请求
try {
for (int i = 1; i <= 10; i++) {
pool3.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool3.shutdown();
}
}
}
运行结果:
pool-1-thread-1办理业务
pool-1-thread-2办理业务
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-5办理业务
pool-1-thread-6办理业务
pool-1-thread-7办理业务
pool-1-thread-8办理业务
pool-1-thread-1办理业务
pool-1-thread-8办理业务
newScheduledThreadPool
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但
由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个
任务的延迟或异常都将会影响到之后的任务。
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-12:26
* @description:
*/
@Slf4j(topic = "d.TimerTaskDemo")
public class TimerTaskDemo {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
log.debug("start...");
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
}
运行结果
12:39:58 [main] d.TimerTaskDemo - start...
12:39:59 [Timer-0] d.TimerTaskDemo - task 1
12:40:01 [Timer-0] d.TimerTaskDemo - task 2
本应该在start一秒后,task1、task2执行 但是timer中两个任务需要顺序执行,task1执行两秒结束后后task2才执行
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-12:26
* @description:
*/
@Slf4j(topic = "d.TimerTaskDemo")
public class TimerTaskDemo {
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
try {
int i = 10/0;
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
log.debug("start...");
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
}
运行结果
12:46:42 [main] d.TimerTaskDemo - start...
12:46:43 [Timer-0] d.TimerTaskDemo - task 1
Exception in thread "Timer-0" java.lang.ArithmeticException: / by zero
at com.dongguo.pool.TimerTaskDemo$1.run(TimerTaskDemo.java:23)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
如果task1出现异常,那task2也就无法执行了
使用 ScheduledExecutorService 实现定时功能
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-12:43
* @description:
*/
@Slf4j(topic = "d.ScheduledThreadPoolDemo")
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);//创建两个线程
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
log.debug("task 1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}, 1, TimeUnit.SECONDS);
executor.schedule(() -> {
log.debug("task 2");
}, 1, TimeUnit.SECONDS);
}
}
运行结果
12:49:46 [pool-1-thread-2] d.ScheduledThreadPoolDemo - task 2
12:49:46 [pool-1-thread-1] d.ScheduledThreadPoolDemo - task 1
使用 ScheduledExecutorService 实现周期执行
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-12:43
* @description:
*/
@Slf4j(topic = "d.ScheduledThreadPoolDemo")
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
log.debug("start...");
executor.scheduleAtFixedRate(() -> {
log.debug("task running");
}, 1,1, TimeUnit.SECONDS);//延时时间1s,之后每隔1s执行一次
}
}
scheduleAtFixedRate如果执行所需要的时间超过周期时间
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-12:43
* @description:
*/
@Slf4j(topic = "d.ScheduledThreadPoolDemo")
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
log.debug("start...");
executor.scheduleAtFixedRate(() -> {
log.debug("task running");
try {
TimeUnit.SECONDS.sleep(2);//如果执行所需要的时间超过周期时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1,1, TimeUnit.SECONDS);//延时时间1s,之后每隔1s执行一次
}
}
一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
scheduleWithFixedDelay如果执行所需要的时间超过周期时间
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-12:43
* @description:
*/
@Slf4j(topic = "d.ScheduledThreadPoolDemo")
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
log.debug("start...");
// executor.scheduleAtFixedRate(() -> {
// log.debug("task running");
// try {
// TimeUnit.SECONDS.sleep(2);//如果执行所需要的时间超过周期时间
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }, 1,1, TimeUnit.SECONDS);//延时时间1s,之后每隔1s执行一次
executor.scheduleWithFixedDelay(() -> {
log.debug("task running");
try {
TimeUnit.SECONDS.sleep(2);//如果执行所需要的时间超过周期时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1,1, TimeUnit.SECONDS);//延时时间1s,之后每隔1s执行一次
}
}
一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时1s <-> 下一个任务开始执行2s
所以间隔都是 3s
异常处理
线程池对于出现异常不会主动提示
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Dongguo
* @date 2021/9/13 0013-13:12
* @description:
*/
@Slf4j(topic = "d.FixedThreadPoolDemo")
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
});
}
}
可以选择手动try catch
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Dongguo
* @date 2021/9/13 0013-13:12
* @description:
*/
@Slf4j(topic = "d.FixedThreadPoolDemo")
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
log.debug("task1");
int i = 1 / 0;
} catch (Exception e) {
log.error("error:", e);
}
});
}
}
运行结果
13:13:54 [pool-1-thread-1] d.FixedThreadPoolDemo - task1
13:13:54 [pool-1-thread-1] d.FixedThreadPoolDemo - error:
java.lang.ArithmeticException: / by zero
at com.dongguo.pool.FixedThreadPoolDemo.lambda$main$0(FixedThreadPoolDemo.java:20)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
也可以使用 Future获得执行的结果
package com.dongguo.pool;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
* @author Dongguo
* @date 2021/9/13 0013-13:12
* @description:
*/
@Slf4j(topic = "d.FixedThreadPoolDemo")
public class FixedThreadPoolDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(1);
Future<Boolean> f = pool.submit(() -> {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", f.get());
}
}
运行结果
13:16:50 [pool-1-thread-1] d.FixedThreadPoolDemo - task1
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.dongguo.pool.FixedThreadPoolDemo.main(FixedThreadPoolDemo.java:24)
Caused by: java.lang.ArithmeticException: / by zero
at com.dongguo.pool.FixedThreadPoolDemo.lambda$main$0(FixedThreadPoolDemo.java:21)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
线程池底层工作原理(重要)
-
在创建了线程池后,线程池中的线程数为零
-
当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
-
当一个线程完成任务时,它会从队列中取下一个任务来执行
-
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小
当队列满了,创建的非核心线程池是处理新的任务还是处理队列中的任务
线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。
当阻塞队列满了时,会创建非核心线程池处理新的任务,如果后续有线程空闲,会取出队列中的任务执行,新的任务如果发现核心线程满了,阻塞队列没满,就会进入队列中排队。
注意事项(重要)
-
项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是FixedThreadPool和SingleThreadExecutor底层都是用LinkedBlockingQueue实现的,这个队列最大长度为Integer.MAX_VALUE,容易导致OOM。所以实际生产一般自己通过ThreadPoolExecutor的7个参数,自定义线程池
-
创建线程池推荐使用ThreadPoolExecutor及其7个参数手动创建
corePoolSize线程池的核心线程数
maximumPoolSize能容纳的最大线程数
keepAliveTime空闲线程存活时间
unit 存活的时间单位
workQueue 存放提交但未执行任务的队列
threadFactory 创建线程的工厂类
handler 等待队列满后的拒绝策略 -
为什么不允许适用不允许Executors.的方式手动创建线程池,如下图
-
自定义线程池ThreadPoolExecutor
拒绝策略AbortPolicy
package com.dongguo.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/24 0024-21:52
* @description: 自定义创建线程池
*/
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
//10个顾客请求
try {
for (int i = 1; i <= 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
}
运行结果:
pool-1-thread-1办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-5办理业务
java.util.concurrent.RejectedExecutionException: Task com.dongguo.pool.ThreadPoolDemo2$$Lambda$1/1989780873@16b98e56 rejected from java.util.concurrent.ThreadPoolExecutor@7ef20235[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.dongguo.pool.ThreadPoolDemo2.main(ThreadPoolDemo2.java:27)
拒绝策略CallerRunsPolicy
package com.dongguo.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/24 0024-21:52
* @description: 自定义创建线程池
*/
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//10个顾客请求
try {
for (int i = 1; i <= 10; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
}
运行结果
pool-1-thread-2办理业务
main办理业务
pool-1-thread-1办理业务
main办理业务
pool-1-thread-4办理业务
pool-1-thread-1办理业务
pool-1-thread-4办理业务
pool-1-thread-1办理业务
pool-1-thread-3办理业务
pool-1-thread-5办理业务
手写自定义线程池
手写自定义线程池(不带拒绝策略)
阻塞队列BlockingQueue
package com.dongguo.pool;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Dongguo
* @date 2021/9/13 0013-8:17
* @description: 自定义阻塞队列
*/
public class BlockingQueue<T> {
// 1. 任务队列 双向队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
/**
* 阻塞获取
* @return
*/
public T take(){
lock.lock();
try {
//当队列为空,无法获取,等待非空
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空获取队首元素
T t = queue.removeFirst();
//唤醒生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 带超时时间阻塞添加
* @return
*/
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
//统一时间单位纳秒
long nanos = unit.toNanos(timeout);
//当队列为空,无法获取,等待非空
while (queue.isEmpty()){
try {
// 返回值是剩余时间
if (nanos <= 0) {
return null;
}
//为了防止虚假唤醒导致超时时间不变
/**API示例
* <pre> {@code
* boolean aMethod(long timeout, TimeUnit unit) {
* long nanos = unit.toNanos(timeout);
* lock.lock();
* try {
* while (!conditionBeingWaitedFor()) {
* if (nanos <= 0L)
* return false;
* nanos = theCondition.awaitNanos(nanos);
* }
* // ...
* } finally {
* lock.unlock();
* }
* }}</pre>
*@return an estimate of the {@code nanosTimeout} value minus
* the time spent waiting upon return from this method.
* A positive value may be used as the argument to a
* subsequent call to this method to finish waiting out
* the desired time. A value less than or equal to zero
* indicates that no time remains.
*
* 返回的nanos的值就等于传入的nanos的值减去awaitNanos方法执行的时间
*/
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空获取队首元素
T t = queue.removeFirst();
//唤醒生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 阻塞添加
*/
public void put(T task) {
lock.lock();
try {
//当队列满了,等待队列可用
while (queue.size() == capcity) {
try {
System.out.println(Thread.currentThread().getName()+" 等待加入任务队列:"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" 加入任务队列:"+task);
//从队列尾部添加元素
queue.addLast(task);
//唤醒消费者
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
/**
* 带超时时间阻塞添加
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
/**
* 获得当前队列的大小
* @return
*/
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
线程池ThreadPool
1测试阻塞获取take()
package com.dongguo.pool;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-8:50
* @description:
*/
public class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println(Thread.currentThread().getName()+" 新增worker:"+worker);
workers.add(worker);
worker.start();
} else {
taskQueue.put(task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
while(task != null || (task = taskQueue.take()) != null) {
// while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println(Thread.currentThread().getName()+" 正在执行。。"+task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println(Thread.currentThread().getName()+" worker:"+this+"被移除");
workers.remove(this);
}
}
}
}
测试
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool =new ThreadPool(2,1000, TimeUnit.MICROSECONDS,10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
一共有5个任务请求
核心线程数设置为2,main线程创建两个worker 工作线程,两个worker工作线程执行两个任务请求,其余三个任务请求放入任务队列中。
两个worker工作线程执行完各自任务请求,再执行任务队列中的请求。
那么5个任务就执行完了
但是从任务队列中获取任务使用的是taskQueue.take(),如果任务队列中没有任务了会一直阻塞在那里
2测试有超时时间的阻塞获取poll(timeout, timeUnit)
package com.dongguo.pool;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-8:50
* @description:
*/
public class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println(Thread.currentThread().getName()+" 新增worker:"+worker);
workers.add(worker);
worker.start();
} else {
taskQueue.put(task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println(Thread.currentThread().getName()+" 正在执行。。"+task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println(Thread.currentThread().getName()+" worker:"+this+"被移除");
workers.remove(this);
}
}
}
}
重新执行
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool =new ThreadPool(2,1000, TimeUnit.MICROSECONDS,10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
超时时间设置的是1s
任务队列中获取任务使用的是taskQueue.poll(timeout, timeUnit),如果任务队列中没有任务了等待1s如果还没有新的任务请求入队,就放弃等待
手写自定义线程池(带有拒绝策略)
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool =new ThreadPool(2,1000, TimeUnit.MICROSECONDS,10);
for (int i = 0; i < 15; i++) {
int j = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1000L);//演示15个任务请求
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
核心线程数为2,任务队列大小为10,一共最多能处理12个任务请求
如果超过12个,第13个任务请求就会阻塞等待加入任务队列
这里可以使用带超时时间阻塞添加
package com.dongguo.pool;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-8:50
* @description:
*/
public class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
}
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println(Thread.currentThread().getName()+" 新增worker:"+worker);
workers.add(worker);
worker.start();
} else {
// 1) 死等
// taskQueue.put(task);
// 2) 带超时等待
taskQueue.offer(task,timeout,timeUnit);
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
// taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println(Thread.currentThread().getName()+" 正在执行。。"+task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println(Thread.currentThread().getName()+" worker:"+this+"被移除");
workers.remove(this);
}
}
}
}
到底是阻塞获取还是有超时时间的阻塞获取还是不在获取等等,我们可以抽象出一个拒绝策略
我们可以在超过最大请求范围时,设置一个具体的拒绝策略,处理任务请求。
拒绝策略接口
package com.dongguo.pool;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:03
* @description:
*/
@FunctionalInterface // 拒绝策略
interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
BlockingQueue任务队列新增tryPut 带超时时间阻塞添加
package com.dongguo.pool;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Dongguo
* @date 2021/9/13 0013-8:17
* @description: 自定义阻塞队列
*/
public class BlockingQueue<T> {
// 1. 任务队列 双向队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
/**
* 阻塞获取
* @return
*/
public T take(){
lock.lock();
try {
//当队列为空,无法获取,等待非空
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空获取队首元素
T t = queue.removeFirst();
//唤醒生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 带超时时间阻塞获取
* @return
*/
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
//统一时间单位纳秒
long nanos = unit.toNanos(timeout);
//当队列为空,无法获取,等待非空
while (queue.isEmpty()){
try {
// 返回值是剩余时间
if (nanos <= 0) {
return null;
}
//为了防止虚假唤醒导致超时时间不变
/**API示例
* <pre> {@code
* boolean aMethod(long timeout, TimeUnit unit) {
* long nanos = unit.toNanos(timeout);
* lock.lock();
* try {
* while (!conditionBeingWaitedFor()) {
* if (nanos <= 0L)
* return false;
* nanos = theCondition.awaitNanos(nanos);
* }
* // ...
* } finally {
* lock.unlock();
* }
* }}</pre>
*@return an estimate of the {@code nanosTimeout} value minus
* the time spent waiting upon return from this method.
* A positive value may be used as the argument to a
* subsequent call to this method to finish waiting out
* the desired time. A value less than or equal to zero
* indicates that no time remains.
*
* 返回的nanos的值就等于传入的nanos的值减去awaitNanos方法执行的时间
*/
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空获取队首元素
T t = queue.removeFirst();
//唤醒生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 阻塞添加
*/
public void put(T task) {
lock.lock();
try {
//当队列满了,等待队列可用
while (queue.size() == capcity) {
try {
System.out.println(Thread.currentThread().getName()+" 等待加入任务队列:"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+" 加入任务队列:"+task);
//从队列尾部添加元素
queue.addLast(task);
//唤醒消费者
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
/**
* 带超时时间阻塞添加
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
/**
* 获得当前队列的大小
* @return
*/
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
/**
* 带拒绝策略的添加
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满
if(queue.size() == capcity) {
System.out.println(Thread.currentThread().getName()+" 任务队列满了执行策略:"+task);
rejectPolicy.reject(this, task);
} else { // 有空闲
System.out.println(Thread.currentThread().getName()+" 加入任务队列:"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
ThreadPool线程池 当任务队列满了,由调用者决定执行哪种策略
package com.dongguo.pool;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-8:50
* @description:
*/
public class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println(Thread.currentThread().getName()+" 新增worker:"+worker);
workers.add(worker);
worker.start();
} else {
// 1) 死等
// taskQueue.put(task);
// 2) 带超时等待
// taskQueue.offer(task,timeout,timeUnit);
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println(Thread.currentThread().getName()+" 正在执行。。"+task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
System.out.println(Thread.currentThread().getName()+" worker:"+this+"被移除");
workers.remove(this);
}
}
}
}
策略1 阻塞获取,死等
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
// 1. 死等
queue.put(task);
// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
//System.out.println("放弃执行");
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
// task.run();
});
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1000L);/
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
策略2有超时时间的等待
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
//System.out.println("放弃执行");
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
// task.run();
});
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
策略3放弃任务执行
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
//queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
System.out.println("放弃执行"+task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
// task.run();
});
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
策略4抛出异常
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
//queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// System.out.println("放弃执行"+task);
// 4) 让调用者抛出异常
throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
// task.run();
});
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
策略5让调用者自己执行任务
package com.dongguo.pool;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/9/13 0013-9:11
* @description:
*/
public class Client {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,
1000, TimeUnit.MILLISECONDS, 1, (queue, task) -> {
// 1. 死等
// queue.put(task);
// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);
// 3) 让调用者放弃任务执行
// System.out.println("放弃执行"+task);
// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);
// 5) 让调用者自己执行任务
task.run();
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
最大线程数如何设置合理
过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存
CPU密集型
**一般配置线程数=CPU总核心数+1 **
+1 是保证当线程由于页缺失故障(操作系统)或其它原因
导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
**定义:**CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务。该类型的任务需要进行大量的计算,主要消耗CPU资源。 这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。
特点:
01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用
02:针对单台机器,最大线程数一般只需要设置为CPU核心数的线程个数就可以了
03:这一类型多出现在开发中的一些业务复杂计算和逻辑处理过程中。
Runtime.getRuntime().availableProcessors(), //最大核心线程池大小(CPU密集型,根据CPU核数设置)
IO密集型
**一般配置线程数=CPU总核心数 * 2 **
**定义:**IO密集型任务指任务需要执行大量的IO操作,涉及到网络、磁盘IO操作,对CPU消耗较少,其消耗的主要资源为IO。
我们所接触到的 IO ,大致可以分成两种:磁盘 IO和网络 IO。
**01:**磁盘 IO ,大多都是一些针对磁盘的读写操作,最常见的就是文件的读写,假如你的数据库、 Redis 也是在本地的话,那么这个也属于磁盘 IO。
**02:**网络 IO ,这个应该是大家更加熟悉的,我们会遇到各种网络请求,比如 http 请求、远程数据库读写、远程 Redis 读写等等。
IO 操作的特点就是需要等待,我们请求一些数据,由对方将数据写入缓冲区,在这段时间中,需要读取数据的线程根本无事可做,因此可以把 CPU 时间片让出去,直到缓冲区写满。
既然这样,IO 密集型任务其实就有很大的优化空间了(毕竟存在等待):
CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程。当线程进行 I/O 操作 CPU 空闲时,线程等待时间所占比例越高,就需要越多线程,启用其他线程继续使用 CPU,以此提高 CPU 的使用率;线程 CPU 时间所占比例越高,需要越少的线程,这一类型在开发中主要出现在一些计算业务频繁的逻辑中。
根据并发编程网的《如何合理地估算线程池大小》一文中的提示,
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32。这个公式进一步转化为:
可以得出一个结论:
最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
(1.5/0.5+1)*8=32
线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
最后
以上就是心灵美歌曲为你收集整理的ThreadPool线程池线程池ThreadPoolExecutor参数说明手写自定义线程池最大线程数如何设置合理的全部内容,希望文章能够帮你解决ThreadPool线程池线程池ThreadPoolExecutor参数说明手写自定义线程池最大线程数如何设置合理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复