我是靠谱客的博主 积极小海豚,最近开发中收集的这篇文章主要介绍并发编程——线程池,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

一、为什么要使用线程池

二、自定义线程池

自定义任务队列

自定义线程池管理器

拒绝策略接口

测试(任务生产者) 

总结流程

 三、线程池

ThreadPoolExecutor

线程池状态 

线程池的工作状态

四、Executors 类

newFixedThreadPool(固定大小的线程池)

newCachedThreadPool(带缓冲工厂的线程池)

 newSingleThreadExecutor(单线程的线程池)

五、提交任务

六、关闭线程池

shutdown方法

shutdownNow 

其他方法

七、任务调度(newScheduledThreadPool)

延时执行任务(schedule)

延时反复执行(scheduleAtFixedRate、scheduleWithFixedDelay )

八、处理异常 

线程池在执行异常任务时会怎么处理

捕获异常 

九、Fork/Join

十、线程池大小选择 

CPU 密集型运算

I/O 密集型运算 


一、为什么要使用线程池

1、在大量的任务的来临的时候,如果创建过多的线程,受CPU核心的限制,每个线程能分配到的时间片有限,就会导致频繁的上下文切换,导致系统性能降低

2、同时过多的线程的创建会导致消耗过多的内存,把服务器累趴下

3、减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务

二、自定义线程池

首先我们先通过自定义的线程池来了解下线程池的原理

分别是线程池管理器(任务的消费者)、任务队列、任务的生产者(很多任务),当线程池中没有空闲的线程时,任务将会被放在任务队列中。(如果没看懂,先看代码再看总结)

  • 自定义任务队列

class BlockingQueue<T> {
   // 1. 任务队列
   private Deque<T> queue = new ArrayDeque<>();
   // 2. 锁,防止多个线程获取同一个任务
   private ReentrantLock lock = new ReentrantLock();
   // 3. 生产者条件变量,当阻塞队列满了以后,生产者线程等待
   private Condition fullWaitSet = lock.newCondition();
   // 4. 消费者条件变量,当阻塞队列为空以后,消费者线程等待
   private Condition emptyWaitSet = lock.newCondition();
   // 5. 容量
   private int capcity;

   public BlockingQueue(int capcity) {
       this.capcity = capcity;
   }

   // 带超时阻塞获取
   public T poll(long timeout, TimeUnit unit) {
       lock.lock();
       try {
       // 将 timeout 统一转换为 纳秒
       long nanos = unit.toNanos(timeout);
       while (queue.isEmpty()) {
        try {
            // 剩余时间小于等于0了就不等了,返回
           if (nanos <= 0) {
              return null;
            }
            //返回值是 等待时间-经过的时间
            //防止虚假唤醒以后,再次进入循环依旧要等待初始时间,变化的
            nanos = emptyWaitSet.awaitNanos(nanos);
           } catch (InterruptedException e) {
                    e.printStackTrace();
          }
       }
             //如果阻塞队列被消耗了
             T t = queue.removeFirst();
             //唤醒生产者线程继续生产
             fullWaitSet.signal();
              return t;
            } finally {
            lock.unlock();
           }
   }


    // 带超时时间阻塞添加
    public boolean offer(T task, long timeout, TimeUnit timeUnit) {
    lock.lock();
     try {
      long nanos = timeUnit.toNanos(timeout);
       while (queue.size() == capcity) {
          try {
           if (nanos <= 0) {
                return false;
            }
            nanos = fullWaitSet.awaitNanos(nanos);
          } catch (InterruptedException e) {
           e.printStackTrace();
          }
        }
       //入过队列中添加了元素
       queue.addLast(task);
       //唤醒消费者线程
       emptyWaitSet.signal();
       return true;
     } finally {
      lock.unlock();
     }
   }
   //获取阻塞队列的大小
   public int size() {
     lock.lock();
     try {
      return queue.size();
     } finally {
        lock.unlock();
    }
 }
 //带自定义拒绝策略的的添加
 public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
    lock.lock();
    try {
     // 判断队列是否满
     if (queue.size() == capcity) {
     //执行拒绝策略的方法
      rejectPolicy.reject(this, task);
     } else { // 有空闲
       queue.addLast(task);
       emptyWaitSet.signal();
     }
    } finally {
     lock.unlock();
    }
   }
}
  • 自定义线程池管理器

class ThreadPool {
    // 任务队列
    private BlockingQueue<Runnable> taskQueue;
    // 工作线程集合
    private HashSet<Worker> workers = new HashSet<>();
    // 核心线程数
    private int coreSize;
    // 获取任务时的超时时间
    private long timeout;
    private TimeUnit timeUnit;
    private RejectPolicy<Runnable> rejectPolicy;
    
    // 执行任务
    public void execute(Runnable task) {
    // 当任务数没有超过 coreSize 时,直接交给 worker 工作线程对象执行
    // 如果任务数超过 coreSize 时,加入任务队列暂存
        synchronized (workers) {
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);//交给工作线程   
                workers.add(worker);//将工作线程交给工作线程队列
                worker.start();//启动
             } else {
                // 1) 死等
                // 2) 带超时等待
                // 3) 让调用者放弃任务执行
                // 4) 让调用者抛出异常
                // 5) 让调用者自己执行任务
                //拒绝策略,即到底如何处理多余的任务,交由创建线程池的创建者选择
                taskQueue.tryPut(rejectPolicy, task);
             }
         }
     }
   public ThreadPool(int coreSize, 
                    long timeout, 
                    TimeUnit timeUnit, 
                    int queueCapcity, 
                    RejectPolicy<Runnable> rejectPolicy)
                    {
                    this.coreSize = coreSize;//核心线程数
                    this.timeout = timeout;//获取任务的超时时间
                    this.timeUnit = timeUnit;//转换时间器
                    this.taskQueue = new BlockingQueue<>(queueCapcity);//阻塞队列
                    this.rejectPolicy = rejectPolicy;//拒绝策略,在构建线程池的时候定义
                                                     //具体的方法
                    }
   //工作线程
  class Worker extends Thread{
        //任务
        private Runnable task;
        public Worker(Runnable task) {
            this.task = task;
         }
       @Override
        public void run() {
        // 执行任务
        // 1) 当 task 不为空,执行任务
        // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行,如果一直等不到就推出循环
        while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
            try {
                log.debug("正在执行...{}", task);
                task.run();
             } catch (Exception e) {
                e.printStackTrace();
             } finally {
                task = null;
             }
        }
        //如果都没有任务了,那么该工作线程被移除
        synchronized (workers) {
            log.debug("worker 被移除{}", this);
            workers.remove(this);
         }
        }
     }
}
  • 拒绝策略接口

@FunctionalInterface 
// 拒绝策略
interface RejectPolicy<T> {
    void reject(BlockingQueue<T> queue, T task);
}
  • 测试(任务生产者) 

public static void main(String[] args) {
  ThreadPool threadPool = new ThreadPool(1,
          1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{
                  // 1. 死等
                  // queue.put(task);
                  // 2) 带超时等待
                  // queue.offer(task, 1500, TimeUnit.MILLISECONDS);
                  // 3) 让调用者放弃任务执行
                  // log.debug("放弃{}", task);
                  // 4) 让调用者抛出异常
                 // throw new RuntimeException("任务执行失败 " + task);
                 // 5) 让调用者自己执行任务
                 task.run();
            });
  for (int i = 0; i < 4; i++) { 
   int j = i;
   threadPool.execute(() -> { 
    try { 
     Thread.sleep(1000L);
    } catch (InterruptedException e) {
     e.printStackTrace();
    }
    log.debug("{}", j); 
   }); 
  }
 }
  • 总结流程

生者也就是调用线程池执行任务的我们,在创建了线程池以后,调用线程池的执行任务方法,线程池管理器中的会创建工作线程会挨个从生产者中获取任务执行,当创建的工作线程大于核心线程数的时候,剩下的任务将会被放到任务队列中,等待执行,当在工作中的线程执行完手头上的任务以后会去任务队列中去寻找任务执行(我们的实现是如果等待了一会任务队列还没有任务就会自动销毁工作线程)。

 三、线程池

  • ThreadPoolExecutor

        这是一个最基础的线程池的实现,用它介绍下线程池的结构,看看它的继承机构图

  • 线程池状态 

       ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

为什么要用共一个int类型呢?为了减少CAS操作的次数

补充:SHUTDOWN状态下,正在执行的任务会被执行完 ,STOP状态下阻塞队列中抛弃的任务会被收集起来,如果需要可以调用方法获取被抛弃的任务,下面关闭线程提到了

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

为什么RUNNING反而更小呢?因为使用的是高3位,最左边的位置是符号位,1表示负数

这些信息都会被存储到一个原子变量ctl中,来看看怎么存储的?

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));
// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
  • 线程池的工作状态

ThreadPoolExecutor的构造方法

public ThreadPoolExecutor(
    int corePoolSize,//核心线程数目 (最多保留的线程数)
    int maximumPoolSize,//最大线程数目-->等于核心线程数目+救急线程数目
    long keepAliveTime,//生存时间 - 针对救急线程
    TimeUnit unit,//时间单位 - 针对救急线程
    BlockingQueue<Runnable> workQueue,//阻塞队列
    ThreadFactory threadFactory,//线程工厂 - 可以为线程创建时起个好名字
    RejectedExecutionHandler handler)//拒绝策略

1、线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务

2、当线程数达到 corePoolSize (核心线程数目)并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。

3、如果队列选择了有界队列(必须是有界队列),那么任务超过了队列大小时,会创建 (maximumPoolSize - corePoolSize) 数目的线程来救急。

4、如果线程到达 maximumPoolSize(最大线程数)且任务队列也满了以后 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现

        4.1、AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略

        4.2、CallerRunsPolicy 让调用者运行任务

        4.3、DiscardPolicy 放弃本次任务

        4.4、DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

5、当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。

注:核心线程不会自己销毁,是非守护线程,不会随着主线程的结束而结束

四、Executors 类

根据ThreadPoolExecutor的构造方法, Executors 类中提供了众多工厂方法来创建各种用途的线程池。

  • newFixedThreadPool(固定大小的线程池)

public static ExecutorService newFixedThreadPool(int nThreads) {
    //可以看出,最大线程数和核心线程数相等,即没有救急线程,所以救急线程0L
    //阻塞队列是无界队列,没有设置大小
    return new ThreadPoolExecutor(nThreads, nThreads,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>());
}

 特点:

        1、核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
        2、阻塞队列是无界的,可以放任意数量的任务 

使用于任务量已知,相对耗时的任务

  • newCachedThreadPool(带缓冲工厂的线程池)

public static ExecutorService newCachedThreadPool() {
        //核心线程数为0,最大线程数取了很大的值,证明全是救急线程
        //救急线程销毁时间是60s
        //队列使用的是同步队列,没有容量,和无界区分开
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                            new SynchronousQueue<Runnable>());
    }

特点:
 1、核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
2、救急线程可以无限创建
3、队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(就像是来一个任务就创建一个线程来取,没有线程取的意向就没有存的动作)

适合任务数比较密集,但每个任务执行时间较短的情况

来看看SynchronousQueue是怎么执行的,不太懂吧下面代码跑一次就行了

SynchronousQueue<Integer> integers = new SynchronousQueue<>();
//存入线程
new Thread(() -> {
    try {
log.debug("putting {} ", 1);
integers.put(1);
log.debug("{} putted...", 1);
log.debug("putting...{} ", 2);
integers.put(2);
log.debug("{} putted...", 2);
 } catch (InterruptedException e) {
e.printStackTrace();
 }
},"t1").start();
sleep(1);

//取线程1
new Thread(() -> {
try {
log.debug("taking {}", 1);
integers.take();
 } catch (InterruptedException e) {
e.printStackTrace();
 }
},"t2").start();
sleep(1);

//取线程2
new Thread(() -> {
try {
log.debug("taking {}", 2);
integers.take();
 } catch (InterruptedException e) {
e.printStackTrace();
 }
},"t3").start();

结果

11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 //没有取就会阻塞在put方法上
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...//取完put才会执行,这是put后的方法
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...
  •  newSingleThreadExecutor(单线程的线程池)

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
    }

特点

线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

适用于希望多个任务排队执行

既然这样,为什么不能自己创建一个单线程执行任务呢?

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

那为什么不直接使用Executors.newFixedThreadPool(1) 构建呢?

因为Executors.newFixedThreadPool(1) 并不能一定保证它不被修改成为非单线程,因为对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改。而Executors.newSingleThreadExecutor()的外部包裹的FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法setCorePoolSize进行对线程数量的修改

五、提交任务

// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);

// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

List<Future<String>> futures = executorService.invokeAll(Arrays.asList(
                ()->{
                  log.debug("1");
                  Thread.sleep(1000);
                  return "1";
                },
                ()->{
                    log.debug("2");
                    Thread.sleep(1000);
                    return "2";
                },
                ()->{
                    log.debug("3");
                    Thread.sleep(1000);
                    return "3";
                }

        ));
        futures.forEach(f-> {
            try {
                log.debug("{}",f.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        });

// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

String result = executorService.invokeAll(Arrays.asList(
                ()->{
                  log.debug("1");
                  Thread.sleep(1000);
                  return "1";
                },
                ()->{
                    log.debug("2");
                    Thread.sleep(500);
                    return "2";
                },
                ()->{
                    log.debug("3");
                    Thread.sleep(1000);
                    return "3";
                }

        ));
    

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

六、关闭线程池

  • shutdown方法

线程池状态变为 SHUTDOWN

- 不会接收新任务

- 但已提交任务会执行完

- 此方法不会阻塞调用线程的执行

看看ThreadPoolExecutor内的shutdown方法是怎么定义的

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(SHUTDOWN);
            // 仅会打断空闲线程
            interruptIdleWorkers();
            onShutdown(); // 扩展给ScheduledThreadPoolExecutor提供的,不用管
        } finally {
            mainLock.unlock();
        }
        // 尝试终结(没有运行的线程可以立刻终结,其他线程自己运行完自己的任务结束)
        tryTerminate();
    }
  • shutdownNow 

线程池状态变为 STOP

- 不会接收新任务

- 会将队列中的任务返回

- 并用 interrupt 的方式中断正在执行的任务

看看ThreadPoolExecutor内的shutdownNow方法是怎么定义的

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 修改线程池状态
            advanceRunState(STOP);
            // 打断所有线程
            interruptWorkers();
            // 获取队列中剩余任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试终结,这个终结是肯定能伴随着主线程结束而结束的,因为所有其他线程都被打断了
        return tasks;
    }
  • 其他方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

演示(shutdown)

ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(()->{
            log.debug("task1>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task1>>end");
        });

        executorService.submit(()->{
            log.debug("task2>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task2>>end");
        });

        executorService.submit(()->{
            log.debug("task3>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task3>>end");
        });
        
        log.debug("调用shutdown");
        executorService.shutdown();
        log.debug("shutdown执行后");

结果(shutdown)-->可以看出在调用shutdown以后运行线程池并没有结束,执行完手头上的任务和任务队列的任务以后才结束

01:43:25.785 a.w [pool-1-thread-1] - task1>>run
01:43:25.785 a.w [main] - 调用shutdown
01:43:25.785 a.w [pool-1-thread-2] - task2>>run
01:43:25.787 a.w [main] - shutdown执行后
01:43:26.792 a.w [pool-1-thread-1] - task1>>end
01:43:26.792 a.w [pool-1-thread-2] - task2>>end
01:43:26.792 a.w [pool-1-thread-1] - task3>>run
01:43:27.793 a.w [pool-1-thread-1] - task3>>end

演示( awaitTermination)

   ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(()->{
            log.debug("task1>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task1>>end");
        });

        executorService.submit(()->{
            log.debug("task2>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task2>>end");
        });

        executorService.submit(()->{
            log.debug("task3>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.debug("task3>>end");
        });

        log.debug("调用shutdown");
        executorService.shutdown();
        executorService.awaitTermination(3, TimeUnit.MINUTES);
        log.debug("shutdown执行后");
    }

结果( awaitTermination)-->也可以使用future返回的结果get进行阻塞,因为awaitTermination不知道要等多久

01:49:17.410 a.w [pool-1-thread-1] - task1>>run
01:49:17.410 a.w [pool-1-thread-2] - task2>>run
01:49:17.410 a.w [main] - 调用shutdown
01:49:18.414 a.w [pool-1-thread-2] - task2>>end
01:49:18.414 a.w [pool-1-thread-1] - task1>>end
01:49:18.415 a.w [pool-1-thread-2] - task3>>run
01:49:19.417 a.w [pool-1-thread-2] - task3>>end
01:49:19.417 a.w [main] - shutdown执行后

演示(shutdownNow)

ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(()->{
            log.debug("task1>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
               log.debug("被打断了");
            }
        });

        executorService.submit(()->{
            log.debug("task2>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.debug("被打断了");
            }
        });

        executorService.submit(()->{
            log.debug("task3>>run");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                log.debug("被打断了");
            }
        });

        log.debug("调用shutdownNow");
        List<Runnable> runnables = executorService.shutdownNow();
        log.debug("shutdownNow执行后");
        runnables.forEach(p->log.debug("{}",p));

结果(shutdownNow)-->可见shutdownnow方法后线程池中正在执行任务的线程也会被打断,任务队列的任务也不会执行了

02:05:42.103 a.w [pool-1-thread-2] - task2>>run
02:05:42.103 a.w [pool-1-thread-1] - task1>>run
02:05:42.103 a.w [main] - 调用shutdownNow
02:05:42.105 a.w [pool-1-thread-1] - 被打断了
02:05:42.105 a.w [pool-1-thread-2] - 被打断了
02:05:42.105 a.w [main] - shutdownNow执行后
//打印了队列中没执行的任务
02:05:42.106 a.w [main] - java.util.concurrent.FutureTask@2f7c7260

七、任务调度(newScheduledThreadPool)

  • 延时执行任务(schedule)

参数1:执行的任务,参数2:延时执行的时间,参数3:时间单位

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        log.debug("begin");
        executor.schedule(()->{
            log.debug("任务1执行");
        },1,TimeUnit.SECONDS);

        executor.schedule(()->{
            log.debug("任务2执行") ;
        },1,TimeUnit.SECONDS);

 结果——>看到在延时1s后两个任务同时执行了,但是如果线程数小于任务数的时候,后面的任务会等待前面的任务执行完以后有空闲线程才能执行

13:59:18.691 a.testPool [main] - begin
13:59:19.739 a.testPool [pool-1-thread-1] - 任务1执行
13:59:19.739 a.testPool [pool-1-thread-2] - 任务2执行
  • 延时反复执行(scheduleAtFixedRate、scheduleWithFixedDelay )

1、scheduleAtFixedRate

参数1:执行的任务,参数2:第一个任务相对于主线程的延时时间,参数3:每次执行任务的时间间隔,参数4:时间单位

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        log.debug("begin");
        executor.scheduleAtFixedRate(()->{
            log.debug("任务1执行");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },1,1,TimeUnit.SECONDS);


14:06:05.501 a.testPool [main] - begin
14:06:06.547 a.testPool [pool-1-thread-1] - 任务1执行
14:06:08.548 a.testPool [pool-1-thread-1] - 任务1执行
14:06:10.550 a.testPool [pool-1-thread-1] - 任务1执行
14:06:12.552 a.testPool [pool-1-thread-1] - 任务1执行
14:06:14.554 a.testPool [pool-1-thread-1] - 任务1执行
//可以看出,第一个任务相对于主线程延时了1s
//每个任务循环执行的间隔时间是1s,1s后准备执行,此时上一个任务还没执行完,再等1s上一个任务执行完所以时间间隔被撑到了2s

2、scheduleWithFixedDelay

参数1:执行的任务,参数2:第一个任务相对于主线程的延时时间,参数3:每次执行任务的时间间隔,参数4:时间单位

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        log.debug("begin");
        executor.scheduleWithFixedDelay(()->{
            log.debug("任务1执行");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },1,1,TimeUnit.SECONDS);

14:12:24.440 a.testPool [main] - begin
14:12:25.484 a.testPool [pool-1-thread-1] - 任务1执行
14:12:28.487 a.testPool [pool-1-thread-1] - 任务1执行
14:12:31.491 a.testPool [pool-1-thread-1] - 任务1执行
14:12:34.495 a.testPool [pool-1-thread-1] - 任务1执行
14:12:37.500 a.testPool [pool-1-thread-1] - 任务1执行
//第一个任务还是主线程执行后等待1s执行
//后面的任务循环执行是:等待上一个任务执行完然后再间隔1s执行,所以是3s执行一个任务了

3、两者的区别:

scheduleAtFixedRate:前一个任务执行开始,下一个任务准备等待时间间隔准备执行,如果时间间隔等完了,上一个任务执行完,那么下一个任务开始执行,如果上一个任务还没有执行完,那么等待上一个任务执行完后立马执行 

scheduleWithFixedDelay:等待上一个任务执行完后,再等待时间间隔,再执行

八、处理异常 

  • 线程池在执行异常任务时会怎么处理

当线程池中线程执行任务的时候,任务出现未被捕获的异常的情况下
1、线程池会将允许该任务的线程从池中移除并销毁(异常后面的代码也不会执行),且同时会创建一个新的线程加入到线程池中
2、它不会影响到线程池里面其他正在正常运行的线程。

  • 捕获异常 

1、try...cath抓住异常,本任务后续代码还是会执行

2、future ,如果有异常,get()返回的是异常信息,本任务后续代码不会执行

ExecutorService executor = Executors.newFixedThreadPool(1);
        log.debug("begin");
        Future<Boolean> submit = executor.submit(() -> {
            log.debug("任务1执行");
            int i = 1 / 0;
            log.debug("任务1执行完毕");
            return true;
        });
        executor.submit(()->{
            log.debug("任务2执行");
             log.debug("任务2执行完毕");
        });

       log.debug("{}", submit.get());

17:05:29.679 a.testPool [pool-1-thread-1] - 任务1执行
17:05:29.680 a.testPool [pool-1-thread-1] - 任务2执行
17:05:29.680 a.testPool [pool-1-thread-1] - 任务2执行完毕
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at Test89.main(Test89.java:22)
Caused by: java.lang.ArithmeticException: / by zero
	at Test89.lambda$main$0(Test89.java:13)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

3、execute看到堆栈异常的输出,本任务后续代码不会执行

ExecutorService executor = Executors.newFixedThreadPool(1);
        log.debug("begin");
        executor.execute(() -> {
            log.debug("任务1执行");
            int i = 1 / 0;
            log.debug("任务1执行完毕");
        });
        executor.execute(()->{
            log.debug("任务2执行");
             log.debug("任务2执行完毕");
        });

17:07:21.047 a.testPool [main] - begin
17:07:21.088 a.testPool [pool-1-thread-1] - 任务1执行
17:07:21.089 a.testPool [pool-1-thread-2] - 任务2执行
17:07:21.089 a.testPool [pool-1-thread-2] - 任务2执行完毕
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
	at Test89.lambda$main$0(Test89.java:13)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

九、Fork/Join

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算


所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

  ForkJoinPool线程池要求任务继承RecursiveTask抽象类

class AddTask1 extends RecursiveTask<Integer> {
  int n;
  public AddTask1(int n) {
   this.n = n;
  }
  @Override
  public String toString() {
return "{" + n + '}';
 }
   @Override
   protected Integer compute() {
   // 如果 n 已经为 1,可以求得结果了
   if (n == 1) {
    log.debug("join() {}", n);
    return n;
   }
   
    // 将任务进行拆分(fork)
   AddTask1 t1 = new AddTask1(n - 1);
   //交给一个线程执行它
   t1.fork();
   
    log.debug("任务() {} + {}", n, t1);
   
    // 合并(join)结果
   int result = n + t1.join();
   log.debug("join() {} + {} = {}", n, t1, result);
   return result;
 }

 public static void main(String[] args) {
  //不指定大小默认创建cpu核心数一样的线程
  ForkJoinPool forkJoinPool = new ForkJoinPool();
  System.out.println(forkJoinPool.invoke(new AddTask1(5)));
 }
}



17:28:49.582 a.AddTask1 [ForkJoinPool-1-worker-1] - 任务() 5 + {4}
17:28:49.582 a.AddTask1 [ForkJoinPool-1-worker-4] - 任务() 2 + {1}
17:28:49.582 a.AddTask1 [ForkJoinPool-1-worker-2] - 任务() 4 + {3}
17:28:49.582 a.AddTask1 [ForkJoinPool-1-worker-3] - 任务() 3 + {2}
17:28:49.582 a.AddTask1 [ForkJoinPool-1-worker-5] - join() 1
17:28:49.586 a.AddTask1 [ForkJoinPool-1-worker-4] - join() 2 + {1} = 3
17:28:49.586 a.AddTask1 [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
17:28:49.586 a.AddTask1 [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
17:28:49.586 a.AddTask1 [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15

十、线程池大小选择 

过小会导致程序不能充分地利用系统资源、容易导致饥饿,过大会导致更多的线程上下文切换,占用更多内存。

  • CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

  • I/O 密集型运算 

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式4 * 100% * 100% / 10% = 40 

最后

以上就是积极小海豚为你收集整理的并发编程——线程池的全部内容,希望文章能够帮你解决并发编程——线程池所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(76)

评论列表共有 0 条评论

立即
投稿
返回
顶部