概述
阻塞队列与线程池总结
- 一、阻塞队列
- 1.1 阻塞队列接口结构和实现类
- 1.2 BlockingQueue核心方法
- 1.3 阻塞队列API的使用
- 1.4 阻塞队列之同步SynchronousQueue队列
- 1.5 生产者消费者模式(lock实现)
- 1.5 生产者和消费者(阻塞队列实现)
- 二、线程池相关
- 2.1 Callable接口
- 2.2 线程池的架构及优势
- 2.3 线程池的使用
- 2.4 线程池7大参数
- 2.5 线程池底层工作原理
- 2.6 拒绝策略
- 2.7 为什么不用默认创建的线程池?
- 2.8 手写线程池(ThreadPoolExecutor)
- 2.9 线程池配置合理线程数
一、阻塞队列
1.1 阻塞队列接口结构和实现类
阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:
线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素
-
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞
- 当蛋糕店的柜子空的时候,无法从柜子里面获取蛋糕
-
当阻塞队列是满时,从队列中添加元素的操作将会被阻塞
- 当蛋糕店的柜子满的时候,无法继续向柜子里面添加蛋糕了
也就是说 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列插入新的元素
同理,试图往已经满的阻塞队列中添加新元素的线程,直到其它线程往满的队列中移除一个或多个元素,或者完全清空队列后,使队列重新变得空闲起来,并后续新增
为什么要用?
在多线程领域:所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒
为什么需要BlockingQueue?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
架构介绍
BlockingQueue阻塞队列是属于一个接口,底下有七个实现类
- ArrayBlockQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列
- 有界,但是界限非常大,相当于无界,可以当成无界
- PriorityBlockQueue:支持优先级排序的无界阻塞队列
- DelayQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
- 生产一个,消费一个,不存储元素,不消费不生产
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
1.2 BlockingQueue核心方法
方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
性质 | 说明 |
---|---|
抛出异常 | 当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full 当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException |
特殊性 | 插入方法,成功true,失败false 移除方法:成功返回出队列元素,队列没有就返回空 |
一直阻塞 | 当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。 当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。 |
超时退出 | 当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出 |
1.3 阻塞队列API的使用
-
抛出异常组
但执行add方法,向已经满的ArrayBlockingQueue中添加元素时候,会抛出 IIIegalStateException:Queue full,当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException
public class BlockingQueueExceptionDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); try { //抛出 java.lang.IllegalStateException: Queue full System.out.println(blockingQueue.add("XXX")); } catch (Exception e) { System.err.println(e); } System.out.println(blockingQueue.element());//a System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); try { //抛出 java.util.NoSuchElementException System.out.println(blockingQueue.remove()); } catch (Exception e) { System.err.println(e); } try { //element()相当于peek(),但element()会抛NoSuchElementException System.out.println(blockingQueue.element()); } catch (Exception e) { System.err.println(e); } } }
执行结果:
true true true java.lang.IllegalStateException: Queue full a a b c java.util.NoSuchElementException java.util.NoSuchElementException
-
布尔类型组
我们使用 offer的方法,添加元素时候,如果阻塞队列满了后,会返回false,否者返回true
在poll取元素的时候,如果队列已空,那么会返回null
BlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll());
运行结果:
true true true false a b c null
-
阻塞队列组
当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。一般在消息中间件,比如RabbitMQ中会使用到,因为需要保证消息百分百不丢失,因此只有让它阻塞
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); System.out.println("================"); blockingQueue.take(); blockingQueue.take(); blockingQueue.take(); blockingQueue.take();//将被阻塞
-
超时退出组
使用offer插入的时候,需要指定时间,如果2秒还没有插入,那么就放弃插入;同时取的时候也进行判断,如果2秒内取不出来,那么就返回nullpublic class BlockingQueueTimeoutDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println("Offer."); System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS)); System.out.println("Poll."); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); } }
输出结果:
Offer. true true true false Poll. a b c null
1.4 阻塞队列之同步SynchronousQueue队列
SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
下面我们测试SynchronousQueue添加元素的过程
首先我们创建了两个线程,一个线程用于生产,一个线程用于消费
生产的线程分别put了 A、B、C这三个字段
package com.zb.demo;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "t put A ");
blockingQueue.put("A");
System.out.println(Thread.currentThread().getName() + "t put B ");
blockingQueue.put("B");
System.out.println(Thread.currentThread().getName() + "t put C ");
blockingQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "t take A ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "t take B ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
blockingQueue.take();
System.out.println(Thread.currentThread().getName() + "t take C ");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
执行结果:
t1 put A
t2 take A
t1 put B
t2 take B
t1 put C
t2 take C
我们从最后的运行结果可以看出,每次t1线程向队列中添加阻塞队列添加元素后,t1输入线程就会等待 t2消费线程,t2消费后,t2处于挂起状态,等待t1在存入,从而周而复始,形成 一存一取的状态
1.5 生产者消费者模式(lock实现)
一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
关于多线程的操作,我们需要记住下面几句
- 线程 操作 资源类
- 判断 干活 通知
- 防止虚假唤醒机制
完整代码:
class ShareData {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment() throws Exception{
// 加锁
lock.lock();
try {
// 判断
while(number != 0) {
// 等待不能生产
condition.await();
}
// 干活
number++;
System.out.println(Thread.currentThread().getName() + "t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();//释放锁要放在finally中
}
}
public void decrement() throws Exception{
// ,加锁
lock.lock();
try {
// 判断
while(number == 0) {
// 等待不能消费
condition.await();
}
// 干活
number--;
System.out.println(Thread.currentThread().getName() + "t " + number);
// 通知 唤醒
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class prodConsumer_TraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
// 生产
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "生产者").start();
// 消费
new Thread(() -> {
for (int i = 0; i < 5; i++) {
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "消费者").start();
}
}
注意:防止虚假唤醒机制
但是我们对共享变量进行判断的时候,为了防止出现虚假唤醒机制,不能使用if来进行判断,而应该使用while。因为当线程被唤醒时候必须再进行一次判断。
// 判断
while(number != 0) {
// 等待不能生产
condition.await();//当线程从阻塞状态被唤醒的时候必须再进行判断
}
1.5 生产者和消费者(阻塞队列实现)
在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,则这会给我们的程序带来不小的时间复杂度。而使用阻塞队列的话,就不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程
新版的阻塞队列版生产者和消费者,使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用
class MyResource{
private volatile boolean flag = true;//true表示生产,false表示消费
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public MyResource(BlockingQueue<String> blockingQueue){
this.blockingQueue = blockingQueue;
}
//生产者进行生产
public void myProd() throws Exception {
String data = null;
boolean retValue;
while(flag){
data = atomicInteger.incrementAndGet()+"";
retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS);
if(retValue){
System.out.println(Thread.currentThread().getName()+"t插入队列"+data+"成功");
}else{
System.out.println(Thread.currentThread().getName()+"t插入队列"+data+"失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName()+"t大老板叫停了,表示FLAG=false");
}
public void myConsumer() throws Exception{
String result;
while(flag){
result = blockingQueue.poll(2L, TimeUnit.SECONDS);
if(result == null || "".equalsIgnoreCase(result)){
flag = false;
System.out.println(Thread.currentThread().getName()+"t 超过两秒没有取到蛋糕,消费退出");
System.out.println();
System.out.println();
return ;
}
System.out.println(Thread.currentThread().getName()+"t 消费队列蛋糕"+result+"成功");
}
}
public void stop(){
this.flag = false;
}
}
public class ProducerConsumerWithBlockingQueueDemo {
public static void main(String[] args) {
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(3));
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"t----生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
},"prod").start();
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"t-----消费者线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
},"Consumer").start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
myResource.stop();
System.out.println("n5秒钟时间到,大老板main线程叫停,活动结束n");
}
}
最后运行结果:
Consumer -----消费者线程启动
prod ----生产线程启动
prod 插入队列1成功
Consumer 消费队列蛋糕1成功
prod 插入队列2成功
Consumer 消费队列蛋糕2成功
prod 插入队列3成功
Consumer 消费队列蛋糕3成功
prod 插入队列4成功
Consumer 消费队列蛋糕4成功
prod 插入队列5成功
Consumer 消费队列蛋糕5成功
5秒钟时间到,大老板main线程叫停,活动结束
prod 大老板叫停了,表示FLAG=false
Consumer 超过两秒没有取到蛋糕,消费退出
二、线程池相关
2.1 Callable接口
- callable接口概述
从Java 5开始,Java提供了Callable接口,该接口怎么看都像是Runnable接口的增强版,Callable接口提供了一个call()方法可以作为线程执行体,但call()方法比run()方法功能更强大。
- call()方法可以有返回值
- call()方法可以声明抛出异常
问题是:Callable接口是Java 5新增的接口,而且它不是Runnable接口的子接口,所以Callable对象不能直接作为Thread的target。
-
Future接口概述
Java 5提供了Future接口来代表Callable接口里call()方法的返回值,并为Future接口提供了一个
FutureTask
实现类,该实现类实现了Future接口和Runnable接口可以作为Thread类的target。在Future接口里定义了如下几个公共方法来控制它关联的Callable任务- boolcan cancel(boolean maylnterruptltRunning):试图取消该Future里关联的Callable任务
- V
get()
:返回Callable任务里call()方法的返回值。调用该方法将导致程序阻塞,必须等到子线程结束后才会得到返回值 - V get(long timeout,TimeUnit unit):返回Callable任务里call()方法的返回值。该方法让程序最多阻塞timeout和unit指定的时间,如果经过指定时间后Callable任务依然没有返回值, 将会抛出TimeoutExccption异常
- boolean isCancelled():如果在Callable任务正常完成前被取消,则返回true
- boolean isDone():妇果Callable任务已完成,则返回true
注意:Callable接口有泛型限制,Callable接口里的泛型形参类型与call()方法返回值类型相同。
-
创建并启动有返回值的线程的步骤
1. 创建一个实现Callable的实现类 2. 实现call方法,将此线程需要执行的操作声明在call()中 3. 创建Callable接口实现类的对象 4. 将此Callable接口实现类的对象作为传递到FutureTask构造器中,创建FutureTask的对象 5. 将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象,并调用start() 6. 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值
代码实例:
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+"come in callable");
return 1024;
}
}
public class CallableDemo {
public static void main(String[] args) throws Exception {
FutureTask<Integer> futureTask = new FutureTask(new MyThread());
new Thread(futureTask,"AA").start();
//获取call方法的返回值
int result = futureTask.get();//该方法将导致主线程被阻塞,直到call()方法结束并返回为止
System.out.println(result);
}
}
2.2 线程池的架构及优势
架构说明:
Java中线程池是通过Executor框架实现的,该框架中用到了Executor,Executors(代表工具类),ExecutorService,ThreadPoolExecutor这几个类。
为什么用线程池
线程池做的主要工作就是控制运行的线程的数量,处理过程中,将任务放入到队列中,然后线程创建后,启动这些任务,如果线程数量超过了最大数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:线程复用、控制最大并发数、管理线程
线程池的好处
- 降低资源消耗。通过重复利用已创建的线程,降低线程创建和销毁造成的消耗
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就立即执行
- 提高线程的可管理性。线程是稀缺资源,如果无线创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控
2.3 线程池的使用
Executors.newFixedThreadPool(int i)
:创建一个拥有 i 个线程的线程池- 执行长期的任务,性能好很多
- 创建一个定长线程池,可控制线程数最大并发数,超出的线程会在队列中等待
Executors.newSingleThreadExecutor()
:创建一个只有1个线程的单线程池- 一个任务一个任务执行的场景
- 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
Executors.newCachedThreadPool()
; 创建一个可扩容的线程池- 执行很多短期异步的小程序或者负载教轻的服务器
- 创建一个可缓存线程池,如果线程长度超过处理需要,可灵活回收空闲线程,如无可回收,则新建新线程
- Executors.newScheduledThreadPool(int corePoolSize):线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池
使用代码:
public class MyThreadPoolDemo {
public static void main(String[] args) {
//一池5个处理线程(用池化技术,一定要记得关闭)
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
//创建一个只有一个线程的线程池
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
//创建一个拥有N个线程的线程池,根据调度创建合适的线程
ExecutorService threadPool = Executors.newCachedThreadPool();
try{
//模拟10个用户来办理业务,每个用户就是一个来自外部请求线程
for(int i = 1;i<=10;i++){
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"t 办理业务");
}
});
}
}catch(Exception e){
e.printStackTrace();
}finally{
threadPool.shutdown();
}
}
}
2.4 线程池7大参数
介绍7大参数之前先看一下三个常用的创建线程方法的底层源码:
-
newFixedThreadPool(int nThreads)
底层使用了LinkedBlockingQueue 链表阻塞队列 -
newSingleThreadExecutor()
-
newCacheThreadPool()
底层用的是 SynchronousBlockingQueue阻塞队列
我们通过查看源码,发现三个常用的创建线程方法底层都是使用了
ThreadPoolExecutor
。
ThreadPoolExecutor初始化方法的源码:(参数即为线程池的七大参数)
线程池在创建时的7大参数:
corePoolSize
:核心线程数,线程池中的常驻核心线程数- 在创建线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程
- 当线程池中的线程数目达到corePoolSize后,就会把到达的队列放到缓存队列中
maximumPoolSize
:线程池能够容纳同时执行的最大线程数,此值必须大于等于1- 相当有扩容后的线程数,这个线程池能容纳的最多线程数
keepAliveTime
:多余的空闲线程存活时间- 当线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余的空闲线程会被销毁,直到只剩下corePoolSize个线程为止
- 默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用
unit
:keepAliveTime的单位workQueue
:任务队列,被提交的但未被执行的任务(类似于银行里面的候客区)- LinkedBlockingQueue:链表阻塞队列
- SynchronousBlockingQueue:同步阻塞队列
- threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程池 一般用默认即可
- handler:拒绝策略,表示当队列满了并且工作线程大于线程池的最大线程数(maximumPoolSize3)时,如何来拒绝请求执行的Runnable的策略
2.5 线程池底层工作原理
- 在创建了线程池后,等待提交过来的任务请求
- 当调用execute()方法添加一个请求任务时,线程池会做出如下判断
- 如果正在运行的线程池数量小于corePoolSize,那么马上创建线程运行这个任务
- 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列
- 如果这时候队列满了,并且正在运行的线程数量还小于maximumPoolSize,那么还是创建非核心线程like运行这个任务;
- 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
- 当一个线程完成任务时,它会从队列中取下一个任务来执行
- 当一个线程无事可做操作一定的时间(keepAliveTime)时,线程池会判断:
1. 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉
2. 所以线程池的所有任务完成后,它会最终收缩到corePoolSize的大小
2.6 拒绝策略
等待队列也已经排满了,再也塞不下新任务了同时,线程池中的max线程也达到了,无法继续为新任务服务。这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK拒绝策略:
- AbortPolicy:默认,直接抛出RejectedExcutionException异常,阻止系统正常运行
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果运行任务丢失,这是一种好方案
- CallerRunsPolicy:该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
以上内置拒绝策略均实现了RejectedExecutionHandler接口。
2.7 为什么不用默认创建的线程池?
问题:线程池创建的方法有:固定数的,单一的,可变的,那么在实际开发中,应该使用哪个?
答:我们一个都不用,在生产环境中是使用自己自定义的
为什么不用 Executors 中JDK提供的?
根据阿里巴巴手册规定:
2.8 手写线程池(ThreadPoolExecutor)
从上面我们知道,因为默认的Executors创建的线程池,底层都是使用LinkBlockingQueue作为阻塞队列的,而LinkBlockingQueue虽然是有界的,但是它的界限是 Integer.MAX_VALUE 大概有20多亿,可以相当是无界的了,因此我们要使用ThreadPoolExecutor自己手动创建线程池,然后指定阻塞队列的大小
public class MyThreadPool {
public static void main(String[] args) {
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略
try{
//模拟10个用户来办理业务,每个用户就是一个来自外部请求线程
for(int i = 1;i<=10;i++){
threadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"t 办理业务");
}
});
}
}catch(Exception e){
e.printStackTrace();
}finally{
threadPool.shutdown();
}
}
}
2.9 线程池配置合理线程数
生产环境中如何配置 corePoolSize 和 maximumPoolSize
这个是根据具体业务来配置的,分为CPU密集型和IO密集型
- CPU密集型
- CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行
- CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程)
- 而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些
- CPU密集型任务配置尽可能少的线程数量
- 一般公式:CPU核数 + 1个线程数
- IO密集型
方法一:
- 由于IO密集型任务线程并不是一直在执行任务,则可能多的线程,如 CPU核数 * 2
- IO密集型,即该任务需要大量的IO操作,即大量的阻塞
- 在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力花费在等待上
- 所以IO密集型任务中使用多线程可以大大的加速程序的运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
方法二:
- IO密集时,大部分线程都被阻塞,故需要多配置线程数:
- 参考公式:CPU核数 / (1 - 阻塞系数) 阻塞系数在0.8 ~ 0.9左右
- 例如:8核CPU:8/ (1 - 0.9) = 80个线程数
最后
以上就是奋斗酸奶为你收集整理的阻塞队列与线程池总结一、阻塞队列二、线程池相关的全部内容,希望文章能够帮你解决阻塞队列与线程池总结一、阻塞队列二、线程池相关所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复