概述
什么是线程池?
线程池内部维护了若干线程,没有人物的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。
线程池常用的类和接口
在Java标准库提供了如下几个类或接口,来操作并使用线程池:
- ExecutorService接口:来进行管理操作线程池;
- Executors类:用于创建线程池的工具类;
- ThreadPoolExecutor及其子类:线程池;
基本使用方法如下:
// 线程池基本使用方式
// 创建一个ThreadPoolExecutor类型的对象,代表固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3); // 该线程池拥有3个线程
// 执行线程任务
executor.execute(task1);
executor.execute(task2);
executor.execute(task3);
executor.execute(task4);
executor.execute(task5);
// 使用结束后,使用shutdown关闭线程池
executor.shutdown();
线程池的常用方法:
- 执行无返回值的线程任务:void exeutor(Runnable command);
- 提交有返回值的线程任务:Future<T> submit(Callable<T>task);
- 关闭线程池:void shutdown(); 或shutdown();
- 等待线程池关闭:boolean awaitTermination(long timeout,TimeUnit unit);
execute()只能提交Runnable类型的任务,没有返回值,而submit()既能提交Runnable类型的任务也能提交Callable类型任务,可以返回Future类型的结果,用于获取线程任务执行结果。
execute()方法提交的任务异常时直接抛出的,而submit()方法是捕获异常,当调用Future的get()方法获取返回值时,才会抛出异常。
// 计算1-100w的之间所有数字的累加和,每10w个数字交给1个线程处理
// 创建一个固定大小的线程池:
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 创建集合,用于保存Future执行结果
List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
// 每10w个数字,封装成一个Callable线程任务,并提交给线程池
for (int i = 0; i <= 900000; i += 100000) {
Future<Integer> result = executorService.submit(new CalcTask(i+1, i + 100000));
futureList.add(result);
}
// 处理线程任务执行结果
try {
int result = 0;
for (Future<Integer> f : futureList) {
result += f.get();
}
System.out.println("计算结果" + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 关闭线程池
// 省略.....
线程池在程序结束的时候要关闭。使用shutdown()方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()会立刻停止正在执行的任务;
当使用awaitTermination()方法时,主线程会处于一种等待的状态,按照指定的timeout检查线程池。
第一个参数指定的是时间,第二个参数指定的是时间单位(秒),返回值类型是boolean型。
- 如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,awaitTermination()返回true;
- 如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,awaitTermination()返回false;
- 如果等待时间没有超过指定时间,则继续等待。
该方法经常与shutdown()方法配合使用,用于检测线程池中的任务是否已经执行完毕:
// 线程池已提交或执行若干个任务
// 关闭线程池:必须等待任务执行结束后,线程池才会关闭
executorService.shutdown();
// 每隔1秒钟,检查一次线程池的任务执行状态
while(!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
System.out.println("还没有关闭!");
}
System.out.println("已关闭!");
线程池分类:
ExecutorService是一个线程池管理接口,Java标准库提供了几个常用线程池,创建这些线程池的方法都被封装到Executors工具类中。
- FixedThreadPool:线程数固定的线程池,使用Executors.newFixedThreadPool()创建;
- CachedThreadPool:线程数根据任务动态调整的线程池,使用Executors.newSingleThreadExecutor()创建;
- SingleThreadExecutor:仅提供一个单线程的线程池,使用Executors.newSingleThreadExecutor()创建;
- ScheduledThreadPool:能实现定时、周期性人物的线程池,使用Executors.newScheduledThreadPool()创建;
FixedThreadPool线程池
线程数固定的线程池:
public class Main {
public static void main(String[] args) {
// 创建一个固定大小的线程池:
ExecutorService executorService = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
executorService.execute(new Task("线程"+i));
}
// 关闭线程池:
executorService.shutdown();
}
}
class Task implements Runnable {
private String taskName;
public Task(String taskName) {
this.taskName = taskName;
}
@Override
public void run() {
System.out.println("启动线程 ===> " + this.taskName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("结束线程 <= " + this.taskName);
}
}
执行结果:
启动线程 ===> 线程2
启动线程 ===> 线程3
启动线程 ===> 线程0
启动线程 ===> 线程1
结束线程 <= 线程2
结束线程 <= 线程3
结束线程 <= 线程1
结束线程 <= 线程0
启动线程 ===> 线程5
启动线程 ===> 线程4
结束线程 <= 线程4
结束线程 <= 线程5
执行分析:
观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。
CachedThreadPool线程池
线程数根据任务动态调整的线程池
ExecutorService executorService = Executors.newCachedThreadPool();
执行结果:
启动线程 => 线程1
启动线程 => 线程5
启动线程 => 线程2
启动线程 => 线程4
启动线程 => 线程0
启动线程 => 线程3
结束线程 <= 线程4
结束线程 <= 线程1
结束线程 <= 线程5
结束线程 <= 线程0
结束线程 <= 线程3
结束线程 <= 线程2
执行分析:
观察执行结果,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可以一次性全部同时执行。
ScheduledThreadPool线程池
能实现定时、周期性任务的线程:
放入ScheduledTrheadPool的任务可以定期反复执行:
创建ScheduledThreadPool定时任务线程池:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
延迟3秒后执行,任务只执行一次:
executorService.schedule(new Task("线程A"), 3, TimeUnit.SECONDS);
延迟2秒后,每隔3秒执行任务1次
// 方式1:总是以固定的时间间隔触发,不管任务会执行多长时间;
executorService.scheduleAtFixedRate(new Task("线程A"), 2,3, TimeUnit.SECONDS);
// 方式2:上一次任务执行完,等待固定的时间间隔,在执行下一次任务
executorService.scheduleWithFixedDelay(new Task("线程A"), 2,3, TimeUnit.SECONDS);
线程池的执行流程
- 提交一个线程任务,线程池会在线程池中分配一个空闲线程,用于执行线程任务;
- 如果线程池中不存在空闲线程,则线程池会判断当前“存活的线程数”是否小于核心线程数corePoolSize。
- 如果小于核心线程数,线程池会创建一个核心线程去处理提交的线程任务。
- 如果大于核心线程数,则线程池会判断工作队列是否已满;
- 如果工作队列未满,则将该线程任务放入工作队列,等待线程池从工作队列取出并 执行;
- 如果工作队列已满,则判断线程数是否已经达到maximumPoolSize;
- 如果当前存活线程数没有达到maximumPoolSize,则创建一个非核心线程执行提交的任务;
- 如果当前存活线程数已经达到maximumPoolSize,还有新的任务被提交,直接采用拒绝策略处理;
线程池的配置参数
- corePoolSize:线程池核心线程数量,也可以理解为线程池维护的最小线程数量,核心线程创建后不会被回收。大于核心线程数的线程,在空闲时间超过了keepAliveTime后会被回收;
- maximumPoolSize:线程池最大线程数,线程池允许创建的最大线程数量;
- keepAliveTime:非核心线程存活时间,当一个可被回收的线程空闲时间大于keepAliveTime,就会被回收;
- TimeUnit:参数keepAliveTime的时间单位;
- blockingQueue:一个阻塞队列,用来存储等待执行的任务;
- RejectedExecutionHandler:拒绝策略,当线程池内的线程耗尽。并且工作队列达到已满时,新提交的任务,将使用拒绝策略进行处理;
阻塞工作队列BlockingQueue
BlockingQueue阻塞队列接口,实现机制是使用两条线程,允许两个线程同时操作队列,一个线程用于存储(生产者),一个线程用于取出(消费者)。当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列,在保证并发安全的同时,提高了队列的存取效率。
生产者消费者模型:
public class RedPackage {
private String rpid;
private BigDecimal amount;
public RedPackage() {
this.rpid = UUID.randomUUID().toString();
this.amount = new BigDecimal(String.valueOf(Math.random()*10));
}
public String getRpid() {
return rpid;
}
public void setRpid(String rpid) {
this.rpid = rpid;
}
public BigDecimal getAmount() {
return amount;
}
public void setAmount(BigDecimal amount) {
this.amount = amount;
}
@Override
public String toString() {
return "红包 [rpid=" + rpid + ", amount=" + amount + "]";
}
}
public class Producer implements Runnable {
private BlockingQueue<RedPackage> redPackageQueue;
public Producer(BlockingQueue<RedPackage> queue) {
this.redPackageQueue = queue;
}
@Override
public void run() {
while (true) {
if(redPackageQueue.remainingCapacity() > 0) {
RedPackage rpk = new RedPackage();
redPackageQueue.offer(rpk);
System.out.println("[生产者] 已生成1个红包,并放入队列!" + rpk);
}else if(redPackageQueue.remainingCapacity() == 0) {
System.out.println("[生产者] 红包队列已满!");
}
}
}
}
public class Consumer implements Runnable {
private BlockingQueue<RedPackage> redPackageQueue;
public Consumer(BlockingQueue<RedPackage> queue) {
this.redPackageQueue = queue;
}
@Override
public void run() {
while (true) {
RedPackage rpk = null;
try {
rpk = redPackageQueue.poll(1000,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(rpk != null) {
System.out.println("[消费者]线程" + Thread.currentThread().getName() + "抢到红包:" + rpk);
}else {
System.out.println("[消费者]很遗憾,线程" + Thread.currentThread().getName() + "没有抢到红包");
}
}
}
}
常用BlockingQueue阻塞队列实现类
ArrayBlockingQueue
ArrayBlockingQueue是一个有界队列,基于数组实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,按照FIFO的方式排序:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** The queued items */
// ArrayBlockingQueue使用定长数组做为存储结构
final Object[] items;
// 创建时传入数组容量(长度)
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
}
// 使用ArrayBlockingQueue创建自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
使用ArrayBlockinQueue有界队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于线程数量达到maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。
LinkedBlockingQueue
LinkedBlockingQueue是一个无界队列,基于单向链表结构,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,LinkedBlockingQueue吞吐量通常要高于ArrayBlockingQueue。
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 单向链表Node节点
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
// 按照Integer.MAX_VALUE设置容量
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
}
FixedTheadPool、SingleThreadExecutor线程池使用LinkedBlockingQueue队列:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是corePoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意任务与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。
DelayWorkQueue
DelayWorkQueue是基于堆结构的延迟队列,基于数组实现,初始容量16,leader线程用于获取堆顶元素。该队列根据指定的延迟时间从小到大排序,如果延迟时间相同,则根据插入队列的先后排序。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16;
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private Thread leader = null;
}
ScheduledThreadPool
线程池使用了这个队列:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
//.....
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
PriorityBlockingQueue(优先任务队列 )
PriorityBlockingQueue
是一个基于优先级的无界队列(优先级的判断通过构造函数传入的Compator
或元素实现Comparable
接口来决定)。
注意:PriorityBlockingQueue
并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。
class PayOrder implements Runnable,Comparable<PayOrder> {
private int orderNo; // 订单编号
private BigDecimal payment; // 支付金额
public PayOrder(int orderNo, BigDecimal payment) {
this.orderNo = orderNo;
this.payment = payment;
}
@Override
public int compareTo(PayOrder o) {
return this.payment.compareTo(o.payment);
}
@Override
public void run() {
System.out.printf("订单编号为%d,订单金额为:¥%.1f的订单已完成支付!【%s】n",orderNo,payment,Thread.currentThread().getName());
}
public int getOrderNo() {
return orderNo;
}
public void setOrderNo(int orderNo) {
this.orderNo = orderNo;
}
public BigDecimal getPayment() {
return payment;
}
public void setPayment(BigDecimal payment) {
this.payment = payment;
}
}
public class Main {
public static void main(String[] args) throws InterruptedException {
// 使用PriorityBlockingQueue创建线程池,核心线程数为2
ThreadPoolExecutor pool =
new ThreadPoolExecutor(2, 20, 10, TimeUnit.SECONDS,
new PriorityBlockingQueue<Runnable>());
// 执行10笔订单的支付任务
pool.execute(new PayOrder(1,new BigDecimal("1943")));
pool.execute(new PayOrder(2,new BigDecimal("7894")));
pool.execute(new PayOrder(3,new BigDecimal("3253")));
pool.execute(new PayOrder(4,new BigDecimal("1353")));
pool.execute(new PayOrder(5,new BigDecimal("6344")));
pool.execute(new PayOrder(6,new BigDecimal("5430")));
pool.execute(new PayOrder(7,new BigDecimal("3574")));
pool.execute(new PayOrder(8,new BigDecimal("3673")));
pool.execute(new PayOrder(9,new BigDecimal("8653")));
pool.execute(new PayOrder(10,new BigDecimal("1100")));
// 关闭线程池
pool.shutdown();
}
}
SynchronousQueue(同步队列)
不存储元素的阻塞队列(内部没有保存元素的数据结构容器),每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
。
CachedThreadPool
线程池使用这个队列。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public class Main {
public static void main(String[] args) {
// maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略(直接抛出异常)
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 10, TimeUnit.MILLISECONDS,
new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.AbortPolicy());
// 执行的线程任务大于maximumPoolSize,执行拒绝策略
for (int i = 1; i <= 3; i++) {
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "被执行!");
}
});
}
// 关闭线程池
pool.shutdown();
}
}
当任务队列为SynchronousQueue
,创建的线程数大于maximumPoolSize
时,直接执行了拒绝策略抛出异常。
使用SynchronousQueue
队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize
,则尝试创建新的线程,如果达到maximumPoolSize
设置的最大值,则根据你设置的handler
执行拒绝策略。
因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize
数量,否则很容易就会执行拒绝策略;
线程池的状态
线程池的状态分为:running、shutdown、stop、tidying、terminated
- running:运行状态,线程池被一旦创建,就处于running状态,并且线程池中的任务数为0.该状态的线程池会接收新任务,并处理工作队列中的任务。
- shutdown:关闭状态,该状态的线程池不会接收新任务,但会处理工作队列中的任务;
- stop:停止状态,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
- tidying:整理状态,该状态表明所有的任务已经运行终止,记录的任务数量为0;
- terminated:该状态表示线程池彻底终止;
线程池分类总结
FixedThreadPool
线程数固定的线程池
- 核心线程数和最大线程数一致
- 非核心线程数空闲存活时间,即keepAliveTime为0
- 阻塞队列为无界队列
工作机制:
a、提交线程任务
b、如果线程数少于核心线程,创建核心线程任务
c、如果线程数等于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
d、如果线程执行完任务,去阻塞队列取任务,继续执行
使用场景: 适用于处理CPU
密集型的任务,确保CPU
在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。
CachedThreadPool
可缓存线程池,线程数根据任务动态调整的线程池
- 线程池参数:
-
- 核心线程数为
0
- 最大线程数为
Integer.MAX_VALUE
- 工作队列是
SynchronousQueue
同步队列 - 非核心线程空闲存活时间为
60
秒
- 核心线程数为
- 工作机制:
-
- 提交线程任务
- 因为核心线程数为
0
,所以任务直接加到SynchronousQueue
工作队列 - 判断是否有空闲线程,如果有,就去取出任务执行
- 如果没有空闲线程,就新建一个线程执行
- 执行完任务的线程,还可以存活
60
秒,如果在这期间,接到任务,可以继续存活下去;否则,被销毁。
- 使用场景: 用于并发执行大量短期的小任务。
SingleThreadExecutor
单线程化的线程池
- 线程池参数:
-
- 核心线程数为
1
- 最大线程数也为
1
- 阻塞队列是
LinkedBlockingQueue
- 非核心线程空闲存活时间为
0
秒
- 核心线程数为
- 使用场景: 适用于串行执行任务的场景,将任务按顺序执行。
ScheduledThreadPool
能实现定时、周期性任务的线程池
- 线程池参数:
- 最大线程数为
Integer.MAX_VALUE
- 阻塞队列是
DelayedWorkQueue
keepAliveTime
为0
- 阻塞队列是
- 使用场景: 周期性执行任务,并且需要限制线程数量的需求场景。
最后
以上就是温柔黑夜为你收集整理的什么是线程池?什么是线程池?线程池常用的类和接口线程池的常用方法: 线程池分类: FixedThreadPool线程池CachedThreadPool线程池ScheduledThreadPool线程池线程池的执行流程线程池的配置参数阻塞工作队列BlockingQueue常用BlockingQueue阻塞队列实现类 线程池的状态线程池分类总结的全部内容,希望文章能够帮你解决什么是线程池?什么是线程池?线程池常用的类和接口线程池的常用方法: 线程池分类: FixedThreadPool线程池CachedThreadPool线程池ScheduledThreadPool线程池线程池的执行流程线程池的配置参数阻塞工作队列BlockingQueue常用BlockingQueue阻塞队列实现类 线程池的状态线程池分类总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复