概述
概述:
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
线程池的优势:
1.降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
2.提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
3.方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。
4.提供更强大的功能,延时定时线程池。
Java线程池的使用方式有如下几种:
一、JDK 内置线程池有4种(JDK1.5之后)
1.固定线程数的线程池(newFixedThreadPool)
这种线程池里面的线程被设计成存放固定数量的线程,具体线程数可以考虑为CPU核数*N(N可大可小,取决于并发的线程数,计算机可用的硬件资源等)。可以通过下面的代码来获取当前计算机的CPU的核数。
int processors = Runtime.getRuntime().availableProcessors();
FixedThreadPool 是通过 java.util.concurrent.Executors 创建的 ThreadPoolExecutor 实例。这个实例会复用 固定数量的线程处理一个共享的无边界队列 。任何时间点,最多有 nThreads 个线程会处于活动状态执行任务。如果当所有线程都是活动时,有多的任务被提交过来,那么它会一致在队列中等待直到有线程可用。如果任何线程在执行过程中因为错误而中止,新的线程会替代它的位置来执行后续的任务。所有线程都会一致存于线程池中,直到显式的执行 ExecutorService.shutdown() 关闭。由于阻塞队列使用了LinkedBlockingQueue,是一个无界队列,因此永远不可能拒绝任务。LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,意味着他们之间不存在互斥关系,在多CPU情况下,他们能正在在同一时刻既消费,又生产,真正做到并行。因此这种线程池不会拒绝任务,而且不会开辟新的线程,也不会因为线程的长时间不使用而销毁线程。这是典型的生产者----消费者问题,这种线程池适合用在稳定且固定的并发场景,比如服务器。下面代码给出一个固定线程数的DEMO,每个核绑定了5个线程。
案例:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// 获取计算机有几个核
int processors = Runtime.getRuntime().availableProcessors();
// 第一种线程池:固定个数的线程池,可以为每个CPU核绑定一定数量的线程数
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(processors * 5)
for (int i = 0; i < 10; i++) {
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
fixedThreadPool.shutdown();
}
}
2.缓存的线程池(newCachedThreadPool)
核心池大小为0,线程池最大线程数目为最大整型,这意味着所有的任务一提交就会加入到阻塞队列中。当线程池中的线程60s没有执行任务就终止,阻塞队列为SynchronousQueue。SynchronousQueue的take操作需要put操作等待,put操作需要take操作等待,否则会阻塞(线程池的阻塞队列不能存储,所以当目前线程处理忙碌状态时,所以开辟新的线程来处理请求),线程进入wait set。总结下来:①这是一个可以无限扩大的线程池;②适合处理执行时间比较小的任务;③线程空闲时间超过60s就会被杀死,所以长时间处于空闲状态的时候,这种线程池几乎不占用资源;④阻塞队列没有存储空间,只要请求到来,就必须找到一条空闲线程去处理这个请求,找不到则在线程池新开辟一条线程。如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池是,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。
案例:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// 缓存线程池,无上限
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
});
}
cachedThreadPool.shutdown();
}
}
3.单个线程的线程池(newSingleThreadExecutor)
SingleThreadExecutor是使用单个worker线程的Executor,作为单一worker线程的线程池,SingleThreadExecutor把corePool和maximumPoolSize均被设置为1,和FixedThreadPool一样使用的是无界队列LinkedBlockingQueue,所以带来的影响和FixedThreadPool一样。对于newSingleThreadExecutor()来说,也是当线程运行时抛出异常的时候会有新的线程加入线程池替他完成接下来的任务。创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,所以这个比较适合那些需要按序执行任务的场景。比如:一些不太重要的收尾,日志等工作可以放到单线程的线程中去执行。日志记录一般情况会比较慢(数据量大一般可能不写入数据库),顺序执行会拖慢整个接口,堆积更多请求,还可能会对数据库造成影响(事务在开启中),所以日志记录完全可以扔到单线程的线程中去,一条条的处理,也可以认为是一个单消费者的生产者消费者模式。
案例:
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Test {
public static void main(String[] args) {
// 单一线程池,永远会维护存在一条线程
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int j = i;
singleThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + j);
}
});
}
singleThreadPool.shutdown();
}
}
4.固定个数的线程池(newScheduledThreadPool)
相比于第一个固定个数的线程池强大在 ①可以执行延时任务,②也可以执行带有返回值的任务
案例:
public class Test {
public static void main(String[] args) throws InterruptedException, ExecutionException{
// 第四种线程池:固定个数的线程池,相比于第一个固定个数的线程池 强大在 ①可以执行延时任务,②也可以执行
// 有返回值的任务。
// scheduledThreadPool.submit(); 执行带有返回值的任务
// scheduledThreadPool.schedule() 用来执行延时任务.
// 固定个数的线程池,可以执行延时任务,也可以执行带有返回值的任务。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
FutureTask<String> ft = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("hello");
return Thread.currentThread().getName();
}
});
scheduledThreadPool.submit(ft);
// 通过FutureTask对象获得返回值.
String result = ft.get();
System.out.println("result : " + result);
// 执行延时任务
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : bobm!");
}
}, 3L, TimeUnit.SECONDS);
}
}
二、Spring线程池
多线程并发处理起来通常比较麻烦,如果你使用spring容器来管理业务bean,事情就好办了多了。spring封装了Java的多线程的实现,你只需要关注于并发事物的流程以及一些并发负载量等特性,具体来说如何使用spring来处理并发事务。
线程池的启用有两种方式:配置文件或者注解
配置文件:新增spring的配置文件spring-threadpool.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.1.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"
default-autowire="byName">
<description>流量消息spring线程池配置</description>
<!-- 缺省的异步任务线程池 -->
<task:annotation-driven executor="messageExecutor"/>
<task:executor id="asyncExecutor" pool-size="100-10000" queue-capacity="10"/>
<!-- 处理message的线程池 -->
<task:executor id="messageExecutor" pool-size="15-50" queue-capacity="100" keep-alive="60"
rejection-policy="CALLER_RUNS"/>
</beans>
注解:使用@EnableAsync标注启用spring线程池,@Async将方法标注为异步方法,spring扫描到后,执行该方法时,会另起新线程去执行,非常简单
package cn.leadeon.message.test;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Component;
/**
* @author LiJunJun
* @since 2018/10/11
*/
@Component
@EnableAsync
public class AsyncTest {
@Async
public void test1() {
System.out.println("异步执行test1!!!");
System.out.println("线程id:" + Thread.currentThread().getId());
System.out.println("线程名称:" + Thread.currentThread().getName());
}
@Async
public void test2() {
System.out.println("异步执行test2!!!");
System.out.println("线程id:" + Thread.currentThread().getId());
System.out.println("线程名称:" + Thread.currentThread().getName());
}
@Async
public void test3() {
System.out.println("异步执行test3!!!");
System.out.println("线程id:" + Thread.currentThread().getId());
System.out.println("线程名称:" + Thread.currentThread().getName());
}
}
使用注解引入配置文件或者在自己的spring配置文件中import即可。
package cn.leadeon.message.test;
import org.springframework.context.annotation.ImportResource;
import org.springframework.scheduling.annotation.Async; importorg.springframework.stereotype.Component; /**
这里需要注意的是@EnableAsync注解与等价,两者只能使用其一,不然启动会报错。
三、自己定制线程池
首先自定义线程基类,自定义异常处理器,自定义线程名(自定义的异常处理器用excute方法提交线程才有效,用submit方法提交的线程,异常会封装在Future对象中返还给调用者)。
class MyAppThread extends Thread{
private static final String DEFAULT_NAME="TomDog";
private static volatile boolean debugLifeCycle = true;
private static final AtomicInteger created=new AtomicInteger();
private static final AtomicInteger alive=new AtomicInteger();
private static final Logger log=Logger.getAnonymousLogger();
public MyAppThread(Runnable runnable){
this(runnable,DEFAULT_NAME);
}
public MyAppThread(Runnable runnable,String name){
super(runnable,name+"-"+created.incrementAndGet());
setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler(){
public void uncaughtException(Thread t, Throwable e) {
System.out.println("UNCAUGHT in thread "+t.getName());
}
}
);
}
@Override
public void run(){
boolean debug=debugLifeCycle;
if (debug){
System.out.println("Created "+ getName());
}
try {
alive.decrementAndGet();
super.run();
}finally {
alive.decrementAndGet();
if (debug){
System.out.println("Exiting "+getName());
}
}
}
public static int getCreated() {
return created.get();
}
public static int getAlive() {
return alive.get();
}
public static void setDebug(boolean debugLifeCycle) {
MyAppThread.debugLifeCycle = debugLifeCycle;
}
}
有了线程基类接下来创建自己的线程工场,线程工场顾名思义就是生产线程的工场。
class MyThreadFactory implements ThreadFactory{
private final String threadName;
MyThreadFactory(String threadName){
this.threadName=threadName;
}
public Thread newThread(Runnable r) {
return new MyAppThread(r,threadName);
}
}
接下来创建自定义线程池,来实现记录任务的执行时间,可以自定义重写ThreadPoolExecutor的beforeExecute,afterExecute, terminated方法来实现记录每次任务的执行时间,任务的平均执行时间。
class MyTimingThreadPool extends ThreadPoolExecutor{
private final ThreadLocal<Long> startTime
=new ThreadLocal<Long>();
private final AtomicLong numTasks=new AtomicLong();
private final AtomicLong totalTime=new AtomicLong();
public MyTimingThreadPool(int corePoolSize, int maximumPoolSize, longkeepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread thread,Runnable runnable){
super.beforeExecute(thread,runnable);
System.out.println(String.format("Thread %s:start %s",thread,runnable));
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable runnable,Throwable throwable){
try {
long endTime=System.nanoTime();
long taskTime=endTime-startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
System.out.println(String.format(" Thread %s :end %s, time=%dns",throwable,runnable,taskTime));
}finally {
super.afterExecute(runnable,throwable);
}
}
@Override
protected void terminated(){
try {
System.out.println(String.format("Terminated : avg time=%dns",totalTime.get()/numTasks.get()));
}finally {
super.terminated();
}
}
}
根据ThreadPoolExecutor的构造参数可知,我们还需要为其指定任务队列,饱和策略,核心线程数,最大线程数,线程活跃时间,这些根据自己的业务场景来配置就可以了。
对于cpu密集型的任务,可以指定线程的个数为cpu的核数+1 .在java中可以通过RunTime.getRunTime().availableProcessors()来获取。
接下来new一个自定义的线程池:
ThreadPoolExecutor executor=new MyTimingThreadPool(CPU_COUNT,maxPoolSize,keepAliveTime,timeUnit,blockingQueue,
threadFactory ,rejectedExecutionHandler);
这时自己定制的线程池就完成了。
其实我们实际开发的系统中,tomcat有几个线程池,dubbo有几个线程池,kafka client中有几个线程池,zookeeper client中存在着几个线程池.......完成一个请求数据是怎么在多个线程之间传递的,这些都需要我们对感兴趣的去了解清除,才能更明确的了解线程池的应用场景。
总结:线程池在开发中一定会用到,如果能像golang一样,java语言也有协程,也许java程序员就少了一种包袱。看完这篇文章,你学会了么?每日会分享技术文章,更多学习视频请关注:https://member.bilibili.com/platform/upload-manager/article
最后
以上就是害羞纸鹤为你收集整理的必看!!!全网最详细的线程池使用方式,附源码实例的全部内容,希望文章能够帮你解决必看!!!全网最详细的线程池使用方式,附源码实例所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复