概述
Future/Callable
Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
Future接口可以构建异步应用,是多线程开发中常见的设计模式。
当我们需要调用一个函数方法时。如果这个函数执行很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。
因此,我们可以让被调用者立即返回,让他在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获取需要的数据。
简单实现
public class FutureCallableExample {
static class CalculationCallable implements Callable<Integer> {
private int x;
private int y;
public CalculationCallable(int x, int y) {
this.x = x;
this.y = y;
}
@Override
public Integer call() throws Exception {
System.out.println("begin call:" + new Date());
TimeUnit.SECONDS.sleep(2); //模拟任务执行的耗时
return x + y;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CalculationCallable calculationCallable = new CalculationCallable(1, 2);
FutureTask<Integer> futureTask = new FutureTask<>(calculationCallable);
new Thread(futureTask).start(); //本质上还是一个线程.
System.out.println("begin execute futuretask:" + new Date());
Integer rs = futureTask.get(); //阻塞方法-(join)
System.out.println("result:" + rs + "");
System.out.println("end execute futuretask:" + new Date());
}
}
从上述代码中我们可以知道他的基本步骤:
- 实现Callable接口,重写call方法
- 将实现类放入FutureTask中
- 通过new Thread()启动线程(可以看出本质上还是一个线程)
- 通过get()方法获取返回值
运行结果:
begin execute futuretask:Mon Nov 15 12:25:13 CST 2021
begin call:Mon Nov 15 12:25:13 CST 2021
result:3
end execute futuretask:Mon Nov 15 12:25:15 CST 2021
局限性
从运行结果我们可以发现该异步处理方式有一个很严重的问题。我们获取返回结果必须等待异步线程执行完毕,在这个阶段,我们的主线程是出于阻塞阶段的。
实现原理
简单使用之后,我们应该可以大致推论出他的实现原理。
-
我们通过start启动线程之后,会执行Callable接口中的call方法
在FutureTask中的run方法会调用call方法。
-
其他线程通过get()方法获取future的返回结果,如果任务未完成则加入栈中等待;如果执行完成则直接返回结果
public V get() throws InterruptedException, ExecutionException { int s = state; //没有返回结果则等待 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
-
当call方法返回结果时则唤醒栈中的阻塞线程
CompletableFuture
由于Future的局限性,所以在Java8中引入了CompletableFuture工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。由于函数式编程在java中越来越多的被使用到,熟练掌握CompletableFuture,对于更好的使用java 8后的主要新特性很重要。简单起见,本文使用的CompletableFuture版本为java 8(java 11的CompletableFuture新增了一些方法)。
简单使用
首先,CompletableFuture类实现了CompletionStage和Future接口,因此你可以像Future那样使用它。
接下来通过栗子我们来逐渐解释他的使用。
构造函数创建
最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture没有返回结果,调用get方法获取不到结果,当前线程会一直阻塞在这里。
public class CompleteExample {
static class ClientThread implements Runnable {
private CompletableFuture completableFuture;
public ClientThread(CompletableFuture completableFuture) {
this.completableFuture = completableFuture;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + ":" +
completableFuture.get()); //阻塞
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
CompletableFuture cf = new CompletableFuture();
new Thread(new ClientThread(cf), "t1").start();
new Thread(new ClientThread(cf), "t2").start();
}
}
这个时候我们可以手动设置CompletableFuture的值,则上面线程中的结果就能返回。
public static void main(String[] args) {
CompletableFuture cf = new CompletableFuture();
new Thread(new ClientThread(cf), "t1").start();
new Thread(new ClientThread(cf), "t2").start();
cf.complete("Finish");
}
运行结果:
t1:Finish
t2:Finish
这展示了CompletableFuture最简单的创建及使用方法。
supplyAsync创建
CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。
supplyAsync有两种实现方式:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用ForkJoin的线程池来执行被提交的任务。
第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。
下面为使用supplyAsync创建CompletableFuture的示例:
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(()->"Hello World");
System.out.println(cf.get()); //通过阻塞获取执行结果
//() 表示形参,
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "Hello World").thenAccept(xx -> {
System.out.println(xx);
});
//可以继续做其他事情。
}
}
上方代码演示了两种写法:
- 第一种就是简单的创建线程,通过阻塞的方式获取执行结果。
- 第二种是通过thenAccept在处程结束之后进行相应的处理;因为是链式编程,所以可以直接通过
.
调用thenAccept方法;xx是线程的返回参数,名字可以随意定义。
runAsync创建
CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种实现方式:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
下面为使用runAsync()的例子:
public class CompletableFutureExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf=CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+":异步执行一个任务");
});
System.out.println(cf.get()); //通过阻塞获取执行结果.。
}
}
运行结果:
ForkJoinPool.commonPool-worker-9:异步执行一个任务
null
从输出结果我们可以发现,通过runAsync创建的实例是没有返回结果的,所以通过get方法只能返回一个null。
类的构造
我们可以查看一下CompletableFuture的实现:
-
通过Future同步等待执行结果
-
CompletionStage,增强异步回调的功能
CompletionStage
CompletionStage中有很多的方法,我们可以对其进行详细的划分:
-
纯消费类型的方法
纯消费类型的方法,指依赖上一个异步任务的结果作为当前函数的参数进行下一步计算,它的特点是不返回新的计算值,这类的方法都包含 Accept 这个关键字。在CompletionStage中包含9个Accept关键字的方法,这9个方法又可以分为三类:依赖单个CompletionStage任务完成,依赖两个CompletionStage任务都完成,依赖两个CompletionStage中的任何一个完成。
//当前线程同步执行 public CompletionStage<Void> thenAccept(Consumer<? super T> action); //使用ForkJoinPool.commonPool线程池执行action public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); //使用自定义线程池执行action public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,Executor executor); public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
-
有返回值类型的方法
有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有返回值的CompletionStage对象。
在CompletionStage中,定义了9个带有返回结果的方法,同样也可以分为三个类型:依赖单个CompletionStage任务完成,依赖两个CompletionStage任务都完成,依赖两个CompletionStage中的任何一个完成。
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor); public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
-
不消费也不返回的方法
public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
-
多任务组合
public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
常用方法
thenApply
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Apply:").thenApply(r -> {
return r + "cc";
});
System.out.println(cf.get());
}
运行结果:
Apply:cc
在上面的示例中,通过调用thenApply将后置任务连接起来。该示例的最终打印结果为Apply:cc,可见程序在运行中,前置任务的结果返回后,会传递给通过thenApply连接的任务,从而产生一个新的结果。当然,在实际使用中,我们理论上可以无限连接后续计算任务,从而实现链条更长的流式计算。
需要注意的是,通过thenApply连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。
thenCombine
thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型),从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时传递给下游处理任务,从而得到最终结果。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Combine:")
.thenCombine(CompletableFuture.supplyAsync(() -> "Message"), (r1, r2) -> r1 + r2);
System.out.println(cf.get());
}
运行结果:
Combine:Message
从上述代码中可以看出通过thenCombine连接了一个新的CompletableFuture任务,最后将两个任务的返回结果拼接成一个新的返回结果。
一般,在连接任务之间互相不依赖的情况下,可以使用thenCombine来连接任务,从而提升任务之间的并发度。
异常处理
-
whenComplete
whenComplete表示当任务执行完成后,会触发的方法,它的特点是,不论前置的CompletionStage任务是正常执行结束还是出现异常,都能够触发特定的 action 方法,主要方法如下。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Occur Exception"); }); cf1.whenComplete((r, e) -> { if (e != null) { System.out.println("出现异常!"); } else { System.out.println("received result is " + r); } }); }
运行结果:
出现异常
-
handle
handle表示前置任务执行完成后,不管前置任务执行状态是正常还是异常,都会执行handle中的fn 函数,它和whenComplete的作用几乎一致,不同点在于,handle是一个有返回值类型的方法。
r 是返回值;th为异常信息。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture cf = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Occur Exception"); }).handleAsync((r, th) -> { return th != null ? "出现异常" : r; }); System.out.println(cf.get()); }
运行结果:
出现异常
-
exceptionally
exceptionally接受一个 fn 函数,当上一个CompletionStage出现异常时,会把该异常作为参数传递到 fn 函数
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture cf=CompletableFuture.supplyAsync(()->{ throw new RuntimeException("Occur Exception"); }).exceptionally(e->{ System.out.println(e); return "Exceptionally"; }); System.out.println(cf.get()); }
运行结果:
java.util.concurrent.CompletionException: java.lang.RuntimeException: Occur Exception Exceptionally
强制实战
粗略使用以下异步编程,自行创建spring boot项目。
当然我们在实际工作中由很多实现好的方式,比如Dubbo就是引入了CompletableFuture来实现对提供者的异步调用。
实体类
public class Goods {
private Integer id;
private String name;
private BigDecimal price;
private Integer repo;
private Integer buyerNum;
private List<String> comment;
public Goods(Integer id, String name, BigDecimal price) {
this.id = id;
this.name = name;
this.price = price;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public BigDecimal getPrice() {
return price;
}
public void setPrice(BigDecimal price) {
this.price = price;
}
public Integer getRepo() {
return repo;
}
public void setRepo(Integer repo) {
this.repo = repo;
}
public Integer getBuyerNum() {
return buyerNum;
}
public void setBuyerNum(Integer buyerNum) {
this.buyerNum = buyerNum;
}
public List<String> getComment() {
return comment;
}
public void setComment(List<String> comment) {
this.comment = comment;
}
}
服务类
@Service
public class CommentService {
public List<String> getCommentsByGoodsId(Integer goodsId) {
return Arrays.asList("好", "一般", "非常好");
}
}
@Service
public class GoodsService {
public List<Goods> queryGoods() {
return Arrays.asList(
new Goods(1, "电脑", new BigDecimal(5000)),
new Goods(2, "手机", new BigDecimal(5000)),
new Goods(3, "书", new BigDecimal(5000)),
new Goods(4, "杯子", new BigDecimal(5000))
);
}
}
@Service
public class RepoService {
public Integer getRepoByGoodsId(Integer goodsId) {
return new Random().nextInt(1000);
}
}
接口
@RestController
public class GoodsController {
@Autowired
GoodsService goodsService;
@Autowired
CommentService commentService;
@Autowired
RepoService repoService;
@GetMapping("/goods")
public List<Goods> goods() throws ExecutionException, InterruptedException {
//另外的线程在执行
//查询商品信息
CompletableFuture<List<Goods>> goodsFuture = CompletableFuture
.supplyAsync(() -> goodsService.queryGoods());
//等商品信息查询之后异步执行
CompletableFuture cf = goodsFuture.thenApplyAsync(goods -> {
goods.stream().map(goods1 -> CompletableFuture.supplyAsync(() -> {
goods1.setRepo(repoService.getRepoByGoodsId(goods1.getId()));
return goods1;
}).thenCompose(goods2 -> CompletableFuture.supplyAsync(() -> {
goods2.setComment(commentService.getCommentsByGoodsId(goods2.getId()));
return goods2;
}))).toArray(size -> new CompletableFuture[size]);
return goods;
});
// 可以处理其他问题
return (List<Goods>) cf.handleAsync((goods, th) -> th != null ? "系统繁忙" : goods).get();
}
}
结果
[{"id":1,"name":"电脑","price":5000,"repo":399,"buyerNum":null,"comment":["好","一般","非常好"]},{"id":2,"name":"手机","price":5000,"repo":715,"buyerNum":null,"comment":["好","一般","非常好"]},{"id":3,"name":"书","price":5000,"repo":769,"buyerNum":null,"comment":["好","一般","非常好"]},{"id":4,"name":"杯子","price":5000,"repo":204,"buyerNum":null,"comment":["好","一般","非常好"]}]
总结
通过使用异步编程,性能可以得到一定的优化。比如上文代码种,在查询评论信息及库存信息的时候,就不需要串行执行了,而是通过异步的方式查询;所以在查询的时候我们可以进行一些其他操作,只有通过get()方法获取返回数据的时候才会阻塞。
最后
以上就是呆萌烧鹅为你收集整理的Java并发编程——CompletableFuture详解Future/CallableCompletableFuture类的构造常用方法异常处理强制实战的全部内容,希望文章能够帮你解决Java并发编程——CompletableFuture详解Future/CallableCompletableFuture类的构造常用方法异常处理强制实战所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复