我是靠谱客的博主 沉静刺猬,最近开发中收集的这篇文章主要介绍Executor的简单了解,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

线程池 Executor

成员

  • Executor: 是一个接口,它是Executor框架的基础,它将任务的提交和执行分离开
public interface Executor {
    void execute(Runnable command);
}

这个execute方法就行执行任务的方法。

  • ExcutorService:这个Service是个接口,ThreadPoolExecutor和ScheduledThreadpoolExecutor都是其子类。
public interface ExecutorService extends Executor {
    //关闭任务
    void shutdown();

    //尝试关闭所有任务
    List<Runnable> shutdownNow();
    
    //提交任务
    <T> Future<T> submit(Callable<T> task);
    
    //提交任务,并且接收回调
    <T> Future<T> submit(Runnable task, T result);
    //执行任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
}
  • ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务
  • ScheduledThreadpoolExecutor:是一个实现类,用来在指定的延迟时间后运行命令,或者定期执行命令。
  • Future接口和Future接口的实现类FutureTask:表示异步计算的结果
  • Runnable接口和Callable的实现类,都可以被ThreadPoolExecutor和ScheduledThreadpoolExecutor执行。

执行过程

  1. 主线程创建任务对象,也就是Runnable或Callable的实现类
  2. 主线程把任务对象交给ExcutorService执行。
  3. 通过ExcutorService返回的FetureTask对象,调用get方法等待任务执行完毕。当然,也可以通过cancel方法取消任务的执行。

创建线程池

Executor框架提供了三种不同的线程池:

  1. FixedThreadPool:表示创建使用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

会限制当前线程数量的应用场景,适用于负载比较重得服务器。
2. SignleThreadExecutor创建使用单个线程的线程池。

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  1. CachedThreadPool,会根据需要创建新线程的Executor。
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

这个是一个大小没有边界的线程池,适用于执行很多短期异步小任务。

ThreadPoolExecutor

主要成员

  • corePool: 核心线程池大小
  • maximumPool:最大线程池大小
  • BlockingQueue:暂时保存任务的工作队列
  • RejectedExecutionHandle:饱和时,execute方法调用的handler

针对于上面三种线程池,我们来看看其内部的执行过程。

FixedThreadPool

  1. 如果当前线程数小于corePoolSize,那么将会创建新的线程来执行任务
  2. 在线程池的运行线程与corePoolSize一样大时,将会将任务加入阻塞队列中。
  3. 之前创建出的线程执行完任务之后,将会循环的从阻塞队列中获取任务来执行。

因为是一个无界的阻塞队列,所以maximumPool无效,并且不会有饱和的情况。

SignleThreadExecutor

和上面的FixedThreadPool流程基本一致,只不过是size被设置为1.

CachedThreadPool

CachedThreadPool的corePoolSize被设置为0,maximumPoolSize被设置为Int的最大值。也就是maximumPoll是无界的,keepAliveTime设置的空闲存活时间为60s。一旦线程空闲超过60s,将会被终止。

FutureTask实现

FutureTask的三种状态

  • 未启动: 未调用run方法,调用get,导致线程阻塞,调用cancel,不会执行。
  • 启动:调用run方法,正在执行中,调用get,导致线程阻塞,调用cancel(false),无影响。调用cancel(true),中断线程
  • 已完成:正常结束、异常结束、cancel,导致线程立即返回结果或抛出异常。调用cancel,返回false

FutureTask的异步实现

是基于AbstractQueueSynchronizer,抽象同步队列,简称AQS,是一个同步框架,提供通用机制来原子性管理同步状态、阻塞和唤醒线程以及维护被阻塞线程的队列。实现类包括了可重入锁,信号量等。
每一个基于AQS实现的同步器至少会包含两种类型的操作。

  • acquire: 这个操作将阻塞线程,直到AQS允许这个线程继续执行,在FutureTask中,就是get操作
  • release:改变AQS状态,改变状态后,允许一个或多个阻塞线程被解除阻塞。在FutureTask中,是run方法和cancel方法。

FutureTask内部声明了一个AQS的子类Sync,所有的调用都会委托给Sync。

FutrureTask.get流程:
  • 内部会调用到AQS.acquireSharedInterruptibly方法,回调Sync实现的tryAcquireShared方法判断acquire操作是否可以成功。成功的条件为:state为执行完成状态或者已取消状态。
  • 如果成功则get立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。
  • 当其他线程执行了release操作并且唤醒当前线程后,再次执行tryAcquireShared将返回1.
  • 最后返回计算结果。
FutureTask.run流程
  • 执行构造函数指定的任务
  • 以CAS方式更新同步状态。
  • 调用releaseShared方法,唤醒等待队列的第一个线程。
  • 调用FutureTask.done

最后

以上就是沉静刺猬为你收集整理的Executor的简单了解的全部内容,希望文章能够帮你解决Executor的简单了解所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部