我是靠谱客的博主 呆萌烧鹅,最近开发中收集的这篇文章主要介绍Java并发编程——CompletableFuture详解Future/CallableCompletableFuture类的构造常用方法异常处理强制实战,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Future/Callable

Java 1.5开始,提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future接口可以构建异步应用,是多线程开发中常见的设计模式。

当我们需要调用一个函数方法时。如果这个函数执行很慢,那么我们就要进行等待。但有时候,我们可能并不急着要结果。

因此,我们可以让被调用者立即返回,让他在后台慢慢处理这个请求。对于调用者来说,则可以先处理一些其他任务,在真正需要数据的场合再去尝试获取需要的数据。

image-20211115122022181

简单实现

public class FutureCallableExample {
    
    static class CalculationCallable implements Callable<Integer> {
        private int x;
        private int y;

        public CalculationCallable(int x, int y) {
            this.x = x;
            this.y = y;
        }

        @Override
        public Integer call() throws Exception {
            System.out.println("begin call:" + new Date());
            TimeUnit.SECONDS.sleep(2); //模拟任务执行的耗时
            return x + y;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CalculationCallable calculationCallable = new CalculationCallable(1, 2);
        FutureTask<Integer> futureTask = new FutureTask<>(calculationCallable);
        
        new Thread(futureTask).start(); //本质上还是一个线程.
        System.out.println("begin execute futuretask:" + new Date());
        Integer rs = futureTask.get(); //阻塞方法-(join)
        System.out.println("result:" + rs + "");
        System.out.println("end execute futuretask:" + new Date());
    }
}

从上述代码中我们可以知道他的基本步骤:

  • 实现Callable接口,重写call方法
  • 将实现类放入FutureTask中
  • 通过new Thread()启动线程(可以看出本质上还是一个线程)
  • 通过get()方法获取返回值

运行结果:

begin execute futuretask:Mon Nov 15 12:25:13 CST 2021
begin call:Mon Nov 15 12:25:13 CST 2021
result:3
end execute futuretask:Mon Nov 15 12:25:15 CST 2021

局限性

从运行结果我们可以发现该异步处理方式有一个很严重的问题。我们获取返回结果必须等待异步线程执行完毕,在这个阶段,我们的主线程是出于阻塞阶段的。

实现原理

简单使用之后,我们应该可以大致推论出他的实现原理。

  • 我们通过start启动线程之后,会执行Callable接口中的call方法

    在FutureTask中的run方法会调用call方法。

  • 其他线程通过get()方法获取future的返回结果,如果任务未完成则加入栈中等待;如果执行完成则直接返回结果

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //没有返回结果则等待
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
  • 当call方法返回结果时则唤醒栈中的阻塞线程

    image-20211114232039852

CompletableFuture

由于Future的局限性,所以在Java8中引入了CompletableFuture工具,同传统的Future相比,其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性。由于函数式编程在java中越来越多的被使用到,熟练掌握CompletableFuture,对于更好的使用java 8后的主要新特性很重要。简单起见,本文使用的CompletableFuture版本为java 8(java 11的CompletableFuture新增了一些方法)。

简单使用

首先,CompletableFuture类实现了CompletionStage和Future接口,因此你可以像Future那样使用它。

接下来通过栗子我们来逐渐解释他的使用。

构造函数创建

最简单的方式就是通过构造函数创建一个CompletableFuture实例。如下代码所示。由于新创建的CompletableFuture没有返回结果,调用get方法获取不到结果,当前线程会一直阻塞在这里。

public class CompleteExample {
    
    static class ClientThread implements Runnable {
        
        private CompletableFuture completableFuture;

        public ClientThread(CompletableFuture completableFuture) {
            this.completableFuture = completableFuture;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + ":" +
                        completableFuture.get()); //阻塞
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        CompletableFuture cf = new CompletableFuture();
        new Thread(new ClientThread(cf), "t1").start();
        new Thread(new ClientThread(cf), "t2").start();
    }
}

这个时候我们可以手动设置CompletableFuture的值,则上面线程中的结果就能返回。

public static void main(String[] args) {
    CompletableFuture cf = new CompletableFuture();
    new Thread(new ClientThread(cf), "t1").start();
    new Thread(new ClientThread(cf), "t2").start(); 
    cf.complete("Finish");
}

运行结果:

t1:Finish
t2:Finish

这展示了CompletableFuture最简单的创建及使用方法。

supplyAsync创建

CompletableFuture.supplyAsync()也可以用来创建CompletableFuture实例。通过该函数创建的CompletableFuture实例会异步执行当前传入的计算任务。在调用端,则可以通过get或join获取最终计算结果。

supplyAsync有两种实现方式:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
 
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

第一种只需传入一个Supplier实例(一般使用lamda表达式),此时框架会默认使用ForkJoin的线程池来执行被提交的任务。

第二种可以指定自定义的线程池,然后将任务提交给该线程池执行。

下面为使用supplyAsync创建CompletableFuture的示例:

public class CompletableFutureExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture cf = CompletableFuture.supplyAsync(()->"Hello World");
        System.out.println(cf.get());  //通过阻塞获取执行结果
        
        //() 表示形参,
        CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> "Hello World").thenAccept(xx -> {
            System.out.println(xx);
        });
        
        //可以继续做其他事情。
    }
}

上方代码演示了两种写法:

  • 第一种就是简单的创建线程,通过阻塞的方式获取执行结果。
  • 第二种是通过thenAccept在处程结束之后进行相应的处理;因为是链式编程,所以可以直接通过 . 调用thenAccept方法;xx是线程的返回参数,名字可以随意定义。

runAsync创建

CompletableFuture.runAsync()也可以用来创建CompletableFuture实例。与supplyAsync()不同的是,runAsync()传入的任务要求是Runnable类型的,所以没有返回值。因此,runAsync适合创建不需要返回值的计算任务。同supplyAsync()类似,runAsync()也有两种实现方式:

public static CompletableFuture<Void> runAsync(Runnable runnable)
 
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

下面为使用runAsync()的例子:

public class CompletableFutureExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture cf=CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread().getName()+":异步执行一个任务");
        });
        System.out.println(cf.get());  //通过阻塞获取执行结果.。
    }
}

运行结果:

ForkJoinPool.commonPool-worker-9:异步执行一个任务
null

从输出结果我们可以发现,通过runAsync创建的实例是没有返回结果的,所以通过get方法只能返回一个null。

类的构造

我们可以查看一下CompletableFuture的实现:

  • 通过Future同步等待执行结果

  • CompletionStage,增强异步回调的功能

image-20211115130608459

CompletionStage

CompletionStage中有很多的方法,我们可以对其进行详细的划分:

  • 纯消费类型的方法

    纯消费类型的方法,指依赖上一个异步任务的结果作为当前函数的参数进行下一步计算,它的特点是不返回新的计算值,这类的方法都包含 Accept 这个关键字。在CompletionStage中包含9个Accept关键字的方法,这9个方法又可以分为三类:依赖单个CompletionStage任务完成,依赖两个CompletionStage任务都完成,依赖两个CompletionStage中的任何一个完成。

    //当前线程同步执行 
    public CompletionStage<Void> thenAccept(Consumer<? super T> action); 
    //使用ForkJoinPool.commonPool线程池执行action 
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); 
    //使用自定义线程池执行action 
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); 
    
    public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); 
    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); 
    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,Executor executor); 
    
    public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); 
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); 
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
    
  • 有返回值类型的方法

    有返回值类型的方法,就是用上一个异步任务的执行结果进行下一步计算,并且会产生一个新的有返回值的CompletionStage对象。

    在CompletionStage中,定义了9个带有返回结果的方法,同样也可以分为三个类型:依赖单个CompletionStage任务完成,依赖两个CompletionStage任务都完成,依赖两个CompletionStage中的任何一个完成。

    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); 
    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); 
    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); 
    
    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); 
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); 
    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor); 
    
    public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); 
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); 
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
    
  • 不消费也不返回的方法

    public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); 
    public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); 
    
    public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); 
    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); 
    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); 
    
    public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); 
    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); 
    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
    
  • 多任务组合

    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
    

常用方法

thenApply

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Apply:").thenApply(r -> {
        return r + "cc";
    });
    System.out.println(cf.get());
}

运行结果:

Apply:cc

在上面的示例中,通过调用thenApply将后置任务连接起来。该示例的最终打印结果为Apply:cc,可见程序在运行中,前置任务的结果返回后,会传递给通过thenApply连接的任务,从而产生一个新的结果。当然,在实际使用中,我们理论上可以无限连接后续计算任务,从而实现链条更长的流式计算。

需要注意的是,通过thenApply连接的任务,当且仅当前置任务计算完成时,才会开始后置任务的计算。因此,这组函数主要用于连接前后有依赖的任务链。

thenCombine

thenCombine最大的不同是连接任务可以是一个独立的CompletableFuture(或者是任意实现了CompletionStage的类型),从而允许前后连接的两个任务可以并行执行(后置任务不需要等待前置任务执行完成),最后当两个任务均完成时,再将其结果同时传递给下游处理任务,从而得到最终结果。

public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Combine:")
        .thenCombine(CompletableFuture.supplyAsync(() -> "Message"), (r1, r2) -> r1 + r2);
    System.out.println(cf.get());
}

运行结果:

CombineMessage

从上述代码中可以看出通过thenCombine连接了一个新的CompletableFuture任务,最后将两个任务的返回结果拼接成一个新的返回结果。

一般,在连接任务之间互相不依赖的情况下,可以使用thenCombine来连接任务,从而提升任务之间的并发度。

异常处理

  • whenComplete

    whenComplete表示当任务执行完成后,会触发的方法,它的特点是,不论前置的CompletionStage任务是正常执行结束还是出现异常,都能够触发特定的 action 方法,主要方法如下。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Occur Exception");
        });
    
        cf1.whenComplete((r, e) -> {
            if (e != null) {
                System.out.println("出现异常!");
            } else {
                System.out.println("received result is " + r);
            }
        });
    
    }
    

    运行结果:

    出现异常
    
  • handle

    handle表示前置任务执行完成后,不管前置任务执行状态是正常还是异常,都会执行handle中的fn 函数,它和whenComplete的作用几乎一致,不同点在于,handle是一个有返回值类型的方法。

    r 是返回值;th为异常信息。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
            throw new RuntimeException("Occur Exception");
        }).handleAsync((r, th) -> {
            return th != null ? "出现异常" : r;
        });
        System.out.println(cf.get());
    }
    

    运行结果:

    出现异常
    
  • exceptionally

    exceptionally接受一个 fn 函数,当上一个CompletionStage出现异常时,会把该异常作为参数传递到 fn 函数

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture cf=CompletableFuture.supplyAsync(()->{
            throw new RuntimeException("Occur Exception");
        }).exceptionally(e->{
            System.out.println(e);
            return "Exceptionally";
        });
        
        System.out.println(cf.get());
    }
    

    运行结果:

    java.util.concurrent.CompletionException: java.lang.RuntimeException: Occur Exception
    Exceptionally
    

强制实战

粗略使用以下异步编程,自行创建spring boot项目。

当然我们在实际工作中由很多实现好的方式,比如Dubbo就是引入了CompletableFuture来实现对提供者的异步调用。

实体类

public class Goods {

    private Integer id;

    private String name;

    private BigDecimal price;

    private Integer repo;

    private Integer buyerNum;
    
    private List<String> comment;

    public Goods(Integer id, String name, BigDecimal price) {
        this.id = id;
        this.name = name;
        this.price = price;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public BigDecimal getPrice() {
        return price;
    }

    public void setPrice(BigDecimal price) {
        this.price = price;
    }

    public Integer getRepo() {
        return repo;
    }

    public void setRepo(Integer repo) {
        this.repo = repo;
    }

    public Integer getBuyerNum() {
        return buyerNum;
    }

    public void setBuyerNum(Integer buyerNum) {
        this.buyerNum = buyerNum;
    }

    public List<String> getComment() {
        return comment;
    }

    public void setComment(List<String> comment) {
        this.comment = comment;
    }
}

服务类

@Service
public class CommentService {

    public List<String> getCommentsByGoodsId(Integer goodsId) {
        return Arrays.asList("好", "一般", "非常好");
    }
}
@Service
public class GoodsService {
    public List<Goods> queryGoods() {
        return Arrays.asList(
                new Goods(1, "电脑", new BigDecimal(5000)),
                new Goods(2, "手机", new BigDecimal(5000)),
                new Goods(3, "书", new BigDecimal(5000)),
                new Goods(4, "杯子", new BigDecimal(5000))
        );
    }
}
@Service
public class RepoService {
    public Integer getRepoByGoodsId(Integer goodsId) {
        return new Random().nextInt(1000);
    }
}

接口

@RestController
public class GoodsController {

    @Autowired
    GoodsService goodsService;

    @Autowired
    CommentService commentService;

    @Autowired
    RepoService repoService;

    @GetMapping("/goods")
    public List<Goods> goods() throws ExecutionException, InterruptedException {
        //另外的线程在执行
        //查询商品信息
        CompletableFuture<List<Goods>> goodsFuture = CompletableFuture
                .supplyAsync(() -> goodsService.queryGoods());
        //等商品信息查询之后异步执行
        CompletableFuture cf = goodsFuture.thenApplyAsync(goods -> {
            goods.stream().map(goods1 -> CompletableFuture.supplyAsync(() -> {
                goods1.setRepo(repoService.getRepoByGoodsId(goods1.getId()));
                return goods1;
            }).thenCompose(goods2 -> CompletableFuture.supplyAsync(() -> {
                goods2.setComment(commentService.getCommentsByGoodsId(goods2.getId()));
                return goods2;
            }))).toArray(size -> new CompletableFuture[size]);
            return goods;
        });
        
        // 可以处理其他问题

        return (List<Goods>) cf.handleAsync((goods, th) -> th != null ? "系统繁忙" : goods).get();
    }
}

结果

[{"id":1,"name":"电脑","price":5000,"repo":399,"buyerNum":null,"comment":["好","一般","非常好"]},{"id":2,"name":"手机","price":5000,"repo":715,"buyerNum":null,"comment":["好","一般","非常好"]},{"id":3,"name":"书","price":5000,"repo":769,"buyerNum":null,"comment":["好","一般","非常好"]},{"id":4,"name":"杯子","price":5000,"repo":204,"buyerNum":null,"comment":["好","一般","非常好"]}]

总结

通过使用异步编程,性能可以得到一定的优化。比如上文代码种,在查询评论信息及库存信息的时候,就不需要串行执行了,而是通过异步的方式查询;所以在查询的时候我们可以进行一些其他操作,只有通过get()方法获取返回数据的时候才会阻塞。

最后

以上就是呆萌烧鹅为你收集整理的Java并发编程——CompletableFuture详解Future/CallableCompletableFuture类的构造常用方法异常处理强制实战的全部内容,希望文章能够帮你解决Java并发编程——CompletableFuture详解Future/CallableCompletableFuture类的构造常用方法异常处理强制实战所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部