概述
1. 序言
1.1 基于Runnable创建无返回值的任务
-
学习Java高并发、多线程时,经常看到的程序示例都是在新线程中运行一个实现了Runnable接口的任务
new Thread(new Runnable() { @Override public void run() { System.out.printf("Task based on Runnable interface"); } }).start(); // 使用JDK 1.8以后的lambda表达式 new Thread(() -> System.out.printf("Task based on Runnable interface")).start()
-
Thread类的
start()
方法最终会调用操作系统创建一个新的线程,并在新线程中调用runnable.run()
实现任务的运行 -
同时,细心的读者可能会发现Thread类实现了Runnable接口,完全可以通过
thread.run()
启动任务 -
但是,相对
thread.start()
,thread.run()
不会新建线程,而是直接在当前线程运行任务。 -
关于这两个方法的区别,可以参考本人之前的博客:Java线程的状态转换 & 如何停止线程
-
更高级一点的,通过
ExecutorService
向线程池中提交一个实现了Runnable接口的任务ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(new Runnable() { @Override public void run() { System.out.println("Run task with thread pool"); } }); // lambda表达式的写法 executorService.submit(() -> System.out.println("Run task with thread pool"));
1.2 Runnable的局限性
-
在实际的代码开发中,我们经常遇到这样的需求:多线程并行计算,并将计算结果汇总
-
这时,还使用Runnable接口创建计算任务,并不能满足需求,因为Runnable接口的run()方法无返回值
public interface Runnable { public abstract void run(); }
-
可以基于Runnable接口创建非匿名类,在执行任务后获取result值
public static void main(String[] args) throws InterruptedException { ComputeTask task = new ComputeTask(24); new Thread(task).start(); // 等待一段时间,获取计算结果 Thread.sleep(500); System.out.println("Result: " + task.getResult()); } class ComputeTask implements Runnable { private final Integer input; private Integer result; public ComputeTask(Integer input) { this.input = input; result = -1; } @Override public void run() { try { System.out.println("Input: " + input); // 模式计算任务的执行需要耗费一定的时间 Thread.sleep(200); result = input * input; } catch (InterruptedException e) { // 异常需要使用try-catch语句显式处理 throw new RuntimeException(e); } } public Integer getInput() { return input; } }
-
上面的代码实现非常不优雅:
- 无法预料任务何时执行完毕,获取到的result可能不是真实值
- 同时,
run()
方法没有异常声明,无法将方法中的异常上抛给上层调用
1.3 Callable创建有返回值的任务
-
通过查阅资料,发现JDK 1.5以后,提供了Callable接口,用于创建有返回值并可以抛出异常的任务
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
-
JDK源码对Callable的描述如下,同时还介绍了Callable与Runnable接口之间的差异:返回值和抛异常
-
而想要执行一个Callable任务,必须基于ExecutorService的如下
submit()
方法,而非简单地new Thread()<T> Future<T> submit(Callable<T> task);
-
submit()
方法会返回一个Future对象,Future对象包含了Callable任务的执行结果,也就是其call()
方法的返回值 -
通过
Future.get()
方法,可以获取Callable任务的执行结果 -
基于Callable接口,上面的计算任务可以改写如下:
public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); try { Future<Integer> future = executorService.submit(new ComputeTask(24)); // 通过isDone()判断任务是否执行完毕,避免get()方法因为等待任务执行而阻塞当前线程 while (!future.isDone()) { System.out.println("Calculating ..."); Thread.sleep(100); } // get()方法不会阻塞 System.out.println("Result: " + future.get()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } finally { executorService.shutdown(); // 线程池使用完后一定要记得关闭 } } class ComputeTask implements Callable<Integer> { private final Integer input; public ComputeTask(Integer input) { this.input = input; } @Override public Integer call() throws Exception { System.out.println("Input: " + input); // 模式计算任务的执行需要耗费一定的时间,sleep()方法的异常不用特殊处理,可以直接上抛 Thread.sleep(200); return input * input; } }
1.4 Runnable vs Callable
- 关于Runnable和Callable的简单介绍,这篇文章写得很不错:Runnable vs. Callable in Java
- 从如何提交Runnable或Callable任务,到二者在任务返回值、异常处理上的差异,让人对二者有一个简单而又全面的认识
- 题外话: 本人觉得Baeldung这个网站上的文章都挺不错的,对Java相关知识的介绍简单而又全面
- 碎片化学习知识时,可以考虑都看看该网站上的文章
2. Future
- 相对基于Runnable创建有返回值的任务,Callable任务获取返回结果更加 “准确”
- 这是依靠了Future的
isDone() + get()
方法,因为Future的get()是个阻塞方法,会等待任务执行完成后再返回结果 - 如果不使用isDone()方法轮询任务是否执行完毕,否则很容易在get()执行结果时发生阻塞
2.1 Future概述
- JDK源码对Future的描述如下:
- 总结如下:
- Future一个用于表示异步计算(其实就是异步任务)执行结果的类,提供了校验计算是否完成、取消计算、获取计算结果的一系列方法
- 通过Future的get()方法获取结果时,当前线程会被阻塞,直到计算完成并有结果返回
- Future提供了取消计算的cancel()方法,但是若计算已经完成则无法进行取消
- 如果想创建一个允许被取消但不需要返回值的任务,可以使用
Future<?>
并在任务中直接返回null即可
- 以下典型场景都可以使用Future,以实现异步:
- 计算密集型任务,如科学计算
- 操作大量数据集的任务,如大数据场景下对海量数据的处理
- 远程过程调用,如文件下载、网页服务等
2.2 FutureTask类
-
之前的代码中,为了获取任务的返回结果,基于Callable创建了任务,然后通过ExecutorService.submit()方法提交并执行任务
-
submit方法,返回了一个
Future<T>
对象,通过Future去获取任务的执行结果 -
本人将这样的任务叫future任务
-
通过查看源码可知,存在这样的类关系链:
Executors.newSingleThreadExecutor(),返回内含ThreadPoolExecutor的FinalizableDelegatedExecutorService --> FinalizableDelegatedExecutorService extends DelegatedExecutorService --> DelegatedExecutorService中的ExecutorService实际为ThreadPoolExecutor ThreadPoolExecutor extends AbstractExecutorServic --> AbstractExecutorService implements ExecutorService
-
最终,sumit()方法的实现在抽象类AbstractExecutorService中定义,返回的Future对象实际为FutureTask对象
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
-
FutureTask类的UML图如下
-
FutureTask类最终实现了Future和Runnable接口,因此FutureTask可以作为Runnable对象提交到Executor中或者作为Thread中的成员变量
-
AbstractExecutorService的三种submit()方法,都会将Callable或Runnable任务转为FutureTask,然后交给Executor进行执行
<T> Future<T> submit(Callable<T> task); // Future中包裹的计算结果,实际是Callable.run()的返回值 <T> Future<T> submit(Runnable task, T result); // Future中包裹的计算结果,是入参给定的result Future<?> submit(Runnable task); // Future中包裹的计算结果为null
2.3 创建并执行FutureTask
2.3.1 手动创建并执行FutureTask
-
FutureTask内含一个Callable类型的成员变量,其构造函数如下:
// 直接基于Callable对象创建FutureTask public FutureTask(Callable<V> callable) // 将Runnable对象转为返回给定result的Callable对象 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
-
通过FutureTask的构造函数,便可以手动创建一个FutureTask
-
同时,根据FutureTask的UML可知,它其实是一个Runnable任务,因此可以直接通过
new Thread(futureTask).start()
提交并执行futureTask -
手动创建并执行FutureTask的完整代码如下,这里将FutureTask当做Runnable对象,传递给Thread
try { // 基于Callable创建FutureTask FutureTask<String> futureTask = new FutureTask<>(() -> { System.out.println("Generating string ..."); Thread.sleep(1000); return "hello lucy"; }); // 使用新的线程执行future task new Thread(futureTask).start(); // 主线程可以做一些其他的事情 // ... // 其他事情完成后,获取future task的返回结果 System.out.println("Result: " + futureTask.get()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); }
2.3.2 利用ExecutorService自动创建并执行FutureTask
-
使用
ExecutorService.submit(Runnable task, T result)
方法,自动创建并执行有给定结果的FutureTask -
对应的submit()方法为
<T> Future<T> submit(Runnable task, T result);
// 新建线程池 ExecutorService executorService = Executors.newSingleThreadExecutor(); try { // 提交Runnable任务 Future<Integer> future = executorService.submit(() -> { try { Thread.sleep(1000); // 模拟消息的发送 System.out.println("Message send!"); } catch (InterruptedException e) { throw new RuntimeException(e); // Runnable不支持上爬异常,必须通过try-catch显式处理 } }, 200); // 获取执行结果 while (!future.isDone()) { System.out.println("Sending message ..."); Thread.sleep(500); } System.out.printf("Send message, return code: %d", future.get()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } finally { executorService.shutdown(); // 关闭线程池 }
-
执行结果如下:
2.3.3 FutureTask当做Runnable对象
-
前面提到过,FutureTask实现了Runnable接口,因此它可以作为submit()方法的入参
-
这里使用的submit()方法为返回null值的
Future<?> submit(Runnable task);
// 新建线程池 ExecutorService executorService = Executors.newSingleThreadExecutor(); try { // 提交Callable任务 FutureTask<Integer> futureTask = new FutureTask<>(() -> { System.out.println("Calculating ..."); Thread.sleep(1000); return -1; }); Future<?> submit = executorService.submit(futureTask); // 直接通过futureTask获取future结果 while (!futureTask.isDone()) { System.out.println("Calculating ..."); Thread.sleep(500); } System.out.println("Result from FutureTask crested by new FutureTask(): " + futureTask.get()); // 2. 而通过submit()创建的另一个FutureTask,其结果为null System.out.println("Result from FutureTask crested by submit(): " + submit.get()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } finally { executorService.shutdown(); }
-
执行结果如下:
-
注意: submit()方法返回的是基于该FutureTask创建的另一个FutureTask(futureTask2),在不给定result时,futureTask2.get()将返回null
3. 基于实战,学习Future接口中的方法
3.1 cancel() & isCancelled()
- Future提供了cancel()方法,用于取消future任务
boolean cancel(boolean mayInterruptIfRunning);
3.1.1 关于cancel() 方法的一些说明
-
其入参
mayInterruptIfRunning
,表示是否中断正在执行的future任务- 为false时,future任务将继续执行直到结束
- 表面上看,future任务是被取消了,因为无法通过get()方法获取执行结果。但实际上,该future任务仍会继续执行直到结束
-
cancel()方法返回true,表示成功取消future任务。
- 如果future任务已经完成、或者被取消、或出于其他原因不能被中断,则cancel()方法将返回false
-
cancel()方法执行完成后,无论是否成功取消future任务,isDone()方法将返回true。
- 若future任务被成功取消,通过get()方法获取执行结果,将触发
CancellationException
- 因此,isDone()方法返回true并不代表future任务成功执行,可能是future任务被取消
- 若future任务被成功取消,通过get()方法获取执行结果,将触发
-
Future接口,提供了与cancel()方法配套的isCancelled()方法,用于判断future任务是否在执行完毕前(执行中、未开始执行)被取消
boolean isCancelled();
-
通过get()方法获取执行结果时,最好加上isCancelled()方法,避免触发
CancellationException
!future.isCancelled() && future.isDone() // get()方法获取结果的前提条件
3.1.2 代码实战
-
下面的代码,将强制取消正在执行的任务。然后通过get()获取执行结果,将触发
CancellationException
public static void testFuture6() throws InterruptedException, ExecutionException { // 新建线程池 ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(() -> { System.out.println("Calculating ..."); Thread.sleep(500); System.out.println("Finish calculating!"); return 200; }); // 一段时间后,强制取消future task Thread.sleep(300); boolean return_flag = future.cancel(true); if (return_flag) { System.out.println("成功取消future任务, isCancelled(): " + future.isCancelled()); } // 取消后的future任务,isDone()始终返回true;但是,通过get()获取执行结果,将抛出CancellationException try { if (future.isDone()) { System.out.println("执行结果: " + future.get()); } } catch (CancellationException e) { System.out.println("取消后的future任务,不能再使用get()方法"); } }
-
执行结果如下:
-
下面的代码,在future任务执行完毕后,再进行cancel操作,cancel()和isCancelled()方法,都将返回false
public static void main(String[] args) throws InterruptedException, ExecutionException { // 新建线程池 ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(() -> { System.out.println("Calculating ..."); Thread.sleep(500); System.out.println("Finish calculating!"); return 200; }); // future任务执行完成后,再取消任务 while (!future.isDone()) { Thread.sleep(300); } System.out.println("执行结果: " + future.get()); // cancel()和isCancelled()方法,都将返回false boolean return_flag = future.cancel(true); if (!return_flag) { System.out.println("由于某些原因,取消future任务失败, isCancelled(): " + future.isCancelled()); } }
-
执行结果如下:
3.2 get()方法
3.2.1 较为优雅地使用get()方法
-
Future接口中get()方法的定义如下:
V get() throws InterruptedException, ExecutionException;
-
根据前面的描述,get()方法用于获取future任务的执行结果,但它是一个阻塞方法,会一直等待直到有结果返回
-
需调用isDone()方法来轮询future任务是否执行 “完毕” ,再通过get()方法获取执行结果
boolean isDone();
-
这里的完毕包括:正常执行结束、因为异常停止、被取消,该方法都将返回
true
-
加上前面对cancel()方法的学习,可以总结出获取执行结果的、较为完善的代码
- 调用isDone()方法来轮询future任务是否执行 “完毕”
- future任务执行 “完毕”,通过isCancelled()方法确定其是否是因为取消而结束
- 如果不是因为取消而结束,则可以调用get()方法获取执行结果
-
代码示例如下:
public static void main(String[] args) throws InterruptedException, ExecutionException { // 新建线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 提交异步计算任务 Future<Integer> future = executorService.submit(() -> { Thread.sleep(500); return 200; }); // 在另一个线程取消future任务 executorService.submit(() -> { try { Thread.sleep(200); future.cancel(true); } catch (InterruptedException e) { // Runnable无法抛出异常,需要手动处理 throw new RuntimeException(e); } }); // main线程中,轮询future任务的状态 while (!future.isDone()) { System.out.println("Calculating ..."); Thread.sleep(100); } // 获取执行结果前,先确认future任务是否被取消 if (!future.isCancelled()) { System.out.println("执行结果:" + future.get()); } }
3.2.2 显式处理get()方法抛出的异常
-
如果查看了Future接口的源码,会发现get()可能抛出三种异常
CancellationException
:future任务被成功取消后,通过get()方法获取执行结果时,将会触发该异常ExecutionException
:future任务在执行的过程中抛出了异常,通过get()方法获取执行结果时,将会触发该异常InterruptedException
:调用get()方法等待执行结果时,所在的线程被中断,将会触发该异常
-
上面之所称为较为优雅地获取执行结果,那是因为在某些情况下,如果不显式处理get()方法抛出的异常,将出现令人费解的执行结果
-
下面的代码,将在另一个线程中获取future任务的执行结果。
public static void main(String[] args) throws InterruptedException { // 新建线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 提交异步计算任务 Future<Integer> future = executorService.submit(() -> { Thread.sleep(200); if (new Random().nextInt(100) < 100) { // 总会抛出异常 throw new RuntimeException("任务执行过程中,遇到未知异常"); } Thread.sleep(200); return 200; }); executorService.submit(() -> { // main线程中,轮询future任务的状态 while (!future.isDone()) { System.out.println("Calculating ..."); Thread.sleep(100); } if (!future.isCancelled()) { System.out.println("执行结果:" + future.get()); } return 0; }); }
-
future任务抛出异常后,isDone()返回true,最终会通过get()方法获取执行结果
-
get()方法会抛出ExecutionException,但是执行结果让人费解,这个异常就像凭空消失了一样,并未对整个程序造成什么影响
-
自己的猜测(不一定准确):
- 根据对Java异常的学习,该异常应该会上抛给上层调用者,这里的上层调用者是main方法
- 而main方法的逻辑早已执行完毕,对应的线程也处于
TERMINATED
状态,根本无法感知到这个异常 - 备注: main线程和线程池中的线程是相互独立的,有疑问可以参考之前的博客:Java线程的状态转换 & 如何停止线程
-
因此,最好显式地处理这些异常
public static void main(String[] args) { // 新建线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 提交异步计算任务 Future<Integer> future = executorService.submit(() -> { Thread.sleep(200); if (new Random().nextInt(100) < 100) { throw new RuntimeException("任务执行过程中,遇到未知异常"); } Thread.sleep(200); return 200; }); Thread mainThread = Thread.currentThread(); executorService.submit(() -> { try { // main线程中,轮询future任务的状态 while (!future.isDone()) { System.out.println("Calculating ..."); Thread.sleep(100); } System.out.println("执行结果:" + future.get()); return 0; // 表示成功获取计算结果 } catch (CancellationException | ExecutionException | InterruptedException e) { // 将get()方法有关的异常都进行显式处理 System.out.println("捕获" + e.getClass().getSimpleName() + "异常"); System.out.println("main方法执行完毕,对应的线程状态: " + mainThread.getState()); return -1; // 表示获取计算结果失败 } }); }
-
执行结果如下:
3.2.3 带timeout的get()方法
-
某些场景下,我们没有 “耐心” 等待future任务执行完毕;如果future任务在一定时间内未完成,可以认为它存在一定问题,可以 “放弃” 它
-
Future接口提供了带timeout的get()方法,正好可以满足上述需求。
-
与不带timeout的get()方法的唯一差异,获取结果timeout时,该方法会抛出TimeoutException
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
-
有了timeout的加持,可以不用通过isDone()方法轮询future任务的状态
public static void main(String[] args) { // 新建线程池 ExecutorService executorService = Executors.newFixedThreadPool(2); // 提交异步计算任务 Future<Integer> future = executorService.submit(() -> { Thread.sleep(500); return 200; }); executorService.submit(() -> { try { // 设置timeout,获取执行结果 System.out.println("执行结果:" + future.get(300, TimeUnit.MILLISECONDS)); return 0; // 表示成功获取计算结果 } catch (CancellationException | ExecutionException | InterruptedException | TimeoutException e) { if (e instanceof TimeoutException) { System.out.println("获取执行结果超时!"); } return -1; // 表示获取计算结果失败 } }); }
-
执行结果如下:
4. 总结
- 絮叨一下:本来很简单的一个Java知识点,却因为各种原因,导致学习了快一个月
- 其实,学习Future都是为了学习
ListenableFuture
做准备 - 自己平时基本都是使用Runnable创建任务,很少创建需要获取返回结果的任务,所以对Future的使用的很少
4.1 知识小结
Runnable vs Callable
- 二者都可以用来定义多线程中的任务,可以在Thread或者ExecutorService中执行Runnable任务,但是只能在ExecutorService中执行Callable任务
- 二者都是函数式接口,Runnable中定义了一个无返回值的run()方法,Callable中定义了一个有返回值的call()方法 —— 当需要多线程中的任务返回结果时,使用Callable更合适
- Runnable的run()方法的声明没有throws语句,不允许抛出检查型异常(checked exception);Callable的call()方法的声明有
throws Exception
语句,可以简单地将checked Exception上抛
Future接口
- Future一个用于表示异步任务执行结果的类,提供了校验计算是否完成、取消计算、获取计算结果的一系列方法
cancel()
方法:支持是否暂停正在执行中的任务,如果成功取消任务将会返回trueisCancelled()
方法:返回true,表示成功取消正在执行中的任务get()
方法 +isDone()
:通过轮询任务是否执行完毕,避免调用get()方法发生阻塞;如有时效性要求,可以使用带timeout的方法获取执行结果
FutureTask
- 同时实现了Runnable接口和Future接口的类,既可以当做表示异步计算结果的Future对象,又可以当做普通的Runnable对象
- 通过ExecutorService的submit()方法提交的Runnable或Callable任务,都将被封装为FutureTask交给Executor进行执行
- ExecutorService的三种submit()方法,都将返回一个FutureTask对象,只是
submit(Runnable task)
方法返回结果为null的FutureTask
4.2 参考链接
- Runnable vs. Callable in Java
- Guide to java.util.concurrent.Future
- Future and FutureTask in java
最后
以上就是陶醉太阳为你收集整理的Java Future学习1. 序言2. Future3. 基于实战,学习Future接口中的方法4. 总结的全部内容,希望文章能够帮你解决Java Future学习1. 序言2. Future3. 基于实战,学习Future接口中的方法4. 总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复