我是靠谱客的博主 包容香氛,最近开发中收集的这篇文章主要介绍线程池讲解线程池介绍线程池使用调度线程池ForkJoin框架,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

线程池介绍

什么是线程池

为什么使用线程池

如何使用原生创建线程池

其他三种创建线程池的方式,有风险,慎用:

线程池使用

提交任务的2种方式:

Future的方法:

关闭线程池

线程池状态及生命周期

线程池是如何执行任务的:

如何执行批量任务

调度线程池

执行定时、延时的任务

Executors工具类四个方法:

调度线程池执行延时且一次任务的两个方法:

执行周期,重复性的任务:

ForkJoin框架

什么是ForkJoin框架

ForkJoin线程池

创建线程池

这个线程池执行的任务 有两个类:

如何定义任务呢?

案例:

CompletionService

监控线程池

主要监控两个点:

如何使用:


线程池介绍

什么是线程池

线程池是一种基于池化管理线程的工具。(类似于创建一个容器,容器里面装有很多很多的线程,有任务需要执行的时候,就扔给容器,容器使用它的线程去执行这些任务)

为什么使用线程池

* 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的销毁。

* 提高相应速度:当有任务时,任务可以不需要等待线程的创建就能立刻执行。

* 提高线程的可管理型:线程线程池可以进行统一的分配,调优和监控

如何使用原生创建线程池

ThreadPoolExecutor:四种构造方法

ThreadPoolExecutor(int,int,long,TimeUnit,BlockingQueue<Runnable>)

ThreadPoolExecutor(int,int,long,TimeUnit,BlockingQueue<Runnable>,ThreadFactory)

ThreadPoolExecutor(int,int,long,TimeUnit,BlockingQueue<Runnable>,RejectExecutionHandler)

ThreadPoolExecutor(int,int,long,TimeUnit,BlockingQueue<Runnable>,ThreadFactory,RejectExecutionHandler)

以最后一个构造方法说明,参数分别代表什么:

ThreadPoolExecutor(核心线程数,最大线程数,空闲时间线程存活时间,时间单位,任务队列,线程工厂,任务拒绝策略)

核心线程:只要线程池不关闭,就不会被销毁

最大线程数:线程池的最大线程数(核心线程+非核心线程),非核心线程没有执行任务的话是要被清理的,那么多久没执行任务会被清理呢?这就看第3 第4 个参数了,空闲时间线程存活时间,时间单位。

任务队列:当核心线程占满时(指所有的核心线程都在工作),这时候如果有新的任务来到线程池的话,就会放到任务队列中进行排队。 

任务队列常用的有 :LinkedBlockingQueue(链式阻塞队列)、ArrayBlockingQueue(数组阻塞队列)

线程工厂:用于创建线程池中线程的工厂,它是一个接口,实现它的newThread方法,我们可以在方法内写线程的创建实现,然后自定义设置线程属性, 比如线程名字、是否为后台线程。   如果不想执行相关设置的话,我们可以使用默认工厂  Executor.defaultThreadFactory()

任务拒绝策略:当线程池线程已经达到最大线程数,并且线程全部都在工作,然后队列也已经排队排满了,这时候就会采用这个任务拒绝策略。

拒绝策略有四种:(new ThreadPoolExecutor.拒绝策略)

AbortPolicy:默认的拒绝策略,抛出RejectedExecutionException异常

DiscardPolicy:直接丢弃任务

DiscardOldestPolicy:丢弃处于任务队列头部的任务,添加这个新进来的任务到尾部。

CallerRunsPolicy:使用调用者线程直接执行被拒绝的任务。

其他三种创建线程池的方式,有风险,慎用:

FixedThreadPool(固定大小的线程池)、SingleThreadExecutor(单个线程的线程池)、CachedThreadPool(可缓存的线程池)

这三种创建线程池的方法都在Executors工具类中。

 除去重载方法

 前三种内部都是采用ThreadPoolExecutor来创建线程池的,先讲前三个,后三个后面讲解。

Executors.newFixedThreadPool():这种方法创建出来的线程池,核心线程数和最大线程数一样,存活时间为0,表示不会被销毁,采用的队列是LinkedBlockingQueue,这个队列的长度是 2^31,没有这么大的内存给它装,容易导致OOM(不推荐使用)

重载的方法使用:

Executors.newFixedThreadPool(int):创建固定大小的线程池

Executors.newFixedThreadPool(int,ThreadFactory):创建固定大小的线程池,并指定线程工厂

Executors.newSingleThreadExector():创建单个线程的线程池,核心线程和最大线程数一样,都是1,存活时间是0,表示不会被销毁,风险和 FixedThreadPool 一样 采用的队列是LinkedBlockingQueue,这个队列的长度是 2^31,没有这么大的内存给它装,容易导致OOM(不推荐使用)

重载的方法使用:

Executors.newSingleThreadExector(ThreadFactory):创建单个线程的线程池,并指定线程工厂。

Executors.newCacheThreadPool():创建可缓存的线程池,核心线程数为0,线程总数是2^31,空闲时间60秒,因为线程数量为2^31,没有这么大的内存给它装,容易导致OOM。(不推荐使用)

重载的方法使用:

Executors.newCacheThreadPool(ThreadFactory):创建可缓存线程池,并指定线程工厂。

线程池使用

提交任务的2种方式:

execute(无返回值任务)

线程池.execute(任务)

submit(无返回值和有返回值任务)

 Runnable任务的没有返回值为什么类型也是Future呢? Future除了能拿返回值之外,还可以获取线程是否执行完成。

线程池.submit(Runnable)

线程池.submit(Runnable,T)

线程池.submit(Callable<T>)

区别:

 

Future的方法:

 

关闭线程池

线程池 .shutdown()

线程池执行shutdown之后,关闭线程池,不再接收新任务,会按照设置的拒绝策略去拒绝新的任务。但是线程池中正在执行的任务,和队列排队中的任务都会执行完。

线程池 .shutdownNow():在需要立即关闭线程池的时候使用

线程池 .shutdownNow()之后,关闭线程池,不再接收新任务,会按照设置的拒绝策略去拒绝新的任务,尝试停止正在执行的任务,返回任务队列中的任务。

两者区别:

 

线程池状态及生命周期

RUNNING:线程池正在运行,可以接收新的任务,并且也能处理任务队列中的任务

SHUTDOWN:不接收新的任务,但是可以处理任务队列中的任务

STOP:不接收新任务,也不处理任务队列中的任务,还中断处理中的任务

TIDYING:所有任务已终止,线程池中的线程数量为0

TREMINATED:线程池彻底关闭

线程池是如何执行任务的:

任务提交给线程池,首先看线程池是否有核心线程是空闲的,如果有,就将任务给到核心线程去执行。如何核心线程已经满了,就将任务放到任务队列中去。如果任务队列也满了,就创建新的线程执行提交的任务。当线程池中的线程达到了线程池的最大线程数,就采用拒绝策略去拒绝任务。

 

如何执行批量任务

 List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

这个方法的任务集合  必须是Callable类型的任务。 这个方法是按顺序执行任务的,也是按顺序返回结果的。

 List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit)

指定时间内完成任务,时间到了还没完成的,全部取消

T invokeAny(Collection<? extends Callable<T>> tasks,tasks,long timeout,TimeUnit unit)

指定时间内完成任务,时间到了还没完成的,全部取消,返回最先完成的任务的结果。

调度线程池

执行定时、延时的任务

调度线程池指具备执行定时、延时、周期型任务的线程池(ScheduledThreadPoolExecutor

如何创建?可以通过构造方法,也可以通过Executors工具类创建。

Executors工具类四个方法:

newSingleThreadScheduledExecutor() : 创建池中只有一个线程的调度线程池
newSingleThreadScheduledExecutor(ThreadFactory threadFactory) :创建池中只有一个线程的调度线程池并指定线程工厂
newScheduledThreadPool(int corePoolSize) : 指定核心线程数的调度线程池
newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) :指定核心线程数的调度线程池并指定线程工厂

调度线程池执行延时且一次任务的两个方法:

ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit)

延时执行Runnable任务,只执行一次,返回一个任务结果,由于Runnable任务没有执行结果,所以只起到一个查看任务完成情况和停止任务的作用。

<V>ScheduledFuture<V> schedule(Callable<V> callable,long delat,TimeUnit unit)

延时执行Callable任务,只执行一次,返回一个任务结果。

执行周期,重复性的任务:

需要使用 ScheduledExecutorService 中的两个方法

ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,TimeUnit unit);

执行周期性任务,周期性为固定时间(固定时间:每次执行任务的时间相同,周期为1秒,则每隔1秒就执行一次任务,不管这个任务执行需要多少时间)

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,TimeUnit unit);

执行周期性任务,周期性为间隔时间(间隔时间:任务执行完后,间隔一段时间都再执行任务)

例子:假如任务执行都需要两秒钟,周期性都是1秒钟,那么两个方法执行的情况如下图:

 

ForkJoin框架

什么是ForkJoin框架

ForkJoin是吧一个大任务分隔成若干个小任务,再对每个任务得到的结果进行汇总,得到大任务结果。

 

ForkJoin线程池

采用ForkJoin框架的线程池(ForkJoinPool)

创建线程池

ForkJoinPool forkJoinPool = new ForkJoinPool();

这个线程池执行的任务 有两个类:

RecursiveTask(有返回值):相当于Callable

RecursiveAction(无返回值): 相当于Runnable

如何定义任务呢?

定义一个类,继承RecursiveTask 或者 RecursiveAction,重写compute方法

ForkJoinTask :相当于Future,用户接收返回结果的。

案例:

/**
 *  定义一个ForkJoinPool的任务
 * */
public class Task extends RecursiveTask<Integer> {
    //定义起始值
    private int start;
    //定义结束值
    private int end;
    //定义临界值,作为最小子任务的范围条件
    private int temp=10;

    public Task(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        //如果两个值的差小于临界值,说明是最小的子任务了,无需拆分
        if ((end-start)<temp){
            //计算两个数的叠加
            int sum = 0;
            for (int i = start;i<=end;i++){
                sum = sum+i;
            }
            return sum;
        }else{
            //大于临界值,还可以继续分
            int middle = (start+end)/2;
            //定义子任务
            Task task1 = new Task(start,middle);
            Task task2 = new Task(middle+1,end);
            //向线程中添加子任务
            task1.fork();
            task2.fork();
            //join()为获取子任务结果。
            return task1.join() + task2.join();
        }
    }
}

 

public class Test {
    public static void main(String[] args) {
        //定义任务
        Task task = new Task(1,100);
        //创建线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //执行任务
        ForkJoinTask<Integer> submit = forkJoinPool.submit(task);
        Integer integer = null;
        try {
            //获取结果
            integer = submit.get();
            System.out.println(integer);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }finally {
            forkJoinPool.shutdown();
        }

    }
}

运行结果:

 

CompletionService

先执行完先返回

CompletionService<V>是一个接口  ,唯一实现类是 ExecutorCompletionService<V>

两个构造方法:

public ExecutorCompletionService(Executor executor)

public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)

依赖于线程池创建实例。

方法:

 take() 一次返回一次结果,遍历执行take()获取结果。

监控线程池

主要监控两个点:

监控线程的变化情况

监控任务的变化情况

如何使用:

 

public class MonitorThreadPool extends ThreadPoolExecutor{
    public MonitorThreadPool(int corePoolSize, int maximumPoolSize,
                             long keepAliveTime, TimeUnit unit,
                             BlockingQueue<Runnable> workQieie){
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQieie);
    }

    //每次任务执行前调用
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        monitor();
    }

    //每次任务执行后调用
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        monitor();
    }

    //线程池关闭前调用
    @Override
    protected void terminated() {
        monitor();
    }

    public void monitor(){
        System.out.print("正在工作的线程数:"+getActiveCount()+"t");
        System.out.print("当前存在的线程数:"+getPoolSize()+"t");
        System.out.print("历史最大线程数:"+getLargestPoolSize()+"t");
        System.out.print("已提交任务数:"+getTaskCount()+"t");
        System.out.print("已完成任务数数:"+getCompletedTaskCount()+"t");
        System.out.println("队列中的任务数:"+getQueue().size());
    }

}

然后创建这个线程池对象使用,就会有监控的效果了。

最后

以上就是包容香氛为你收集整理的线程池讲解线程池介绍线程池使用调度线程池ForkJoin框架的全部内容,希望文章能够帮你解决线程池讲解线程池介绍线程池使用调度线程池ForkJoin框架所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(61)

评论列表共有 0 条评论

立即
投稿
返回
顶部