我是靠谱客的博主 动人大米,最近开发中收集的这篇文章主要介绍java线程池源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

线程池UML类图

线程池uml关系图

Executor是一个接口,定义一个 void execute(Runnable command); 接口,子类实现这个接口,可以传递一个Runnable的接口进去,来处理这个接口。

execute方法

  public void execute(Runnable command) {
        // 参数空值校验
        if (command == null)
            throw new NullPointerException();
        // 获取线程池状态
        int c = ctl.get();
        // 通过workerCountOf(c)方法获取到当前线程池的线程个数,如果小于核心线程大小,则进行addWorker(command, true)操作
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 标记多层循环跳出位置,并重置循环
        retry:
        for (;;) {
        	// 获取线程池状态
            int c = ctl.get();
            // 根据线程池状态进行运行状态计算
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 如果线程池状态值大于SHUTDOWN 并且 线程池状态不是shutdown, firstTask 任务不为空,工作队列是空,结束并返回false
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 多重检测获取工作线程数
                int wc = workerCountOf(c);
                如果工作线程数大于容量 或者 根据core布尔值进行判断工作线程大于核心线程或者最大线程数则返回false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas方式比较c的大小,如果不一致则重新进入循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // Re-read ctl 重读一次ctl值
                c = ctl.get(); 
                // 如果不相同了进行重新循环 
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
          	创建Worker对象,构造函数中进行线程创建,并持有在w对象下
            w = new Worker(firstTask);
            // 获取生成的线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //重新检查状态.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 把Worker加入到HashSet<Worker> workers中保存
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 如果worker增加到workers中则进行线程启动
                if (workerAdded) {
                    // 最后调用 线程的 start方法,jvm启动一个线程并等待执行worker实现的run方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    //Worker类实现Runable重写了run方法
    public void run() {
        // 调用用runWorker方法,传递Worker对象 ,因为 worker类是一个Runnable和AQS的 子类 
        runWorker(this);
    }

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 获取工作任务
        Runnable task = w.firstTask;
        // 之后对任务置空
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
//            根据工作任务是否为空和获取任务为条件执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 获取实现了Runable的task,执行重写的run方法,还有submit提交时的callable重写的run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 最终处理退出时、或者getwork返回null时即对线程过期
            processWorkerExit(w, completedAbruptly);
        }
    }


    private Runnable getTask() {
		
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            // 是否允许核心线程超时回收或者工作线程数大于核心线程数,
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			如果工作线程大于最大线程池容量或者timed为true 并且 工作线程大于1或者任务队列是空则进行工作线程减少并返回结果null
            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    // 如果超时并且workcount大于核心线程数时,进行减少WorkerCount
                if (compareAndDecrementWorkerCount(c))
                // 返回null
                    return null;
                continue;
            }

            try {
                // 如果为true,则从工作队列获取一个在最大存活时间内存活的任务,否则工作队列调用take方法阻塞等待获取
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    	// 如果被中断进行工作线程减员
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 移除worker线程对象,也就移除了线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
		// 尝试终止线程池
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

    private void decrementWorkerCount() {
        // 死循环进行cas比较并减少工作任务线程数量
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

submit方法

submit 方法在ThreadPoolExecutor类的抽象父类AbstractExecutorService下的公共方法


 public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;      
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;     
    }
public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

 static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
 		// 调用FutureTask 的构造方法
        return new FutureTask<T>(runnable, value);
    }


    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    	// 调用FutureTask 的构造方法
        return new FutureTask<T>(callable);
    }

// 提交一个runnable任务
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        // 调用newTaskFor 重载两个参数的方法
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        // 执行 ThreadPoolExecutor的execute方法
        execute(ftask);
        return ftask;
    }

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
	
	// FutureTask 重写的runnable 的run方法
	public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                	// 调用 RunnableAdapter 的重写Callable接口的call方法,将结果赋值给result
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                // 执行后 将结果赋值
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
  protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        赋值给outcome字段
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

拒绝策略

线程池ThreadPoolExecutor这个类中通过内部类继承RejectedExecutionHandler接口实现的拒绝处理方法。面向接口编程


//ThreadPoolExecutor execute方法会调用这个拒绝方法
 final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}



public static class AbortPolicy implements RejectedExecutionHandler {
       
        public AbortPolicy() { }

     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 在调用时抛出异常
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

  public static class DiscardPolicy implements RejectedExecutionHandler {
       
        public DiscardPolicy() { }

       // 不处理
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
  public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
        public CallerRunsPolicy() { }

     
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 判断线程池是否调用shutdown方法
            if (!e.isShutdown()) {
            // 如果没有则在当前线程调用此任务
                r.run();
            }
        }
    }

 public static class DiscardOldestPolicy implements RejectedExecutionHandler {
      
        public DiscardOldestPolicy() { }
		
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        	// 如果为shutdown
            if (!e.isShutdown()) {
            	// 丢弃一个队列中的任务
                e.getQueue().poll();
                // 调用ThreadPoolExecutor  execute提交一个任务
                e.execute(r);
            }
        }
    }

最后

以上就是动人大米为你收集整理的java线程池源码分析的全部内容,希望文章能够帮你解决java线程池源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部