我是靠谱客的博主 俊秀巨人,最近开发中收集的这篇文章主要介绍202109项目总结: MyBatis Cursor + Parallel Stream使用概述1. MyBatis的Cursor2. ParallelStream使用其他,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

概述

1. MyBatis的Cursor

注意点:

使用例子

2. ParallelStream使用

parallelStream的线程配置:

注意点:

其他


概述

最近刚完成一个项目,总结一下项目中使用到的新技术。可能这些技术已经推出很久很久了,可能很多人每天都在用的,但自己而言以前没使用过的,那就是新的技术;平时做项目,使用的不少技术,甚至很多代码都是从原有的老系统的代码中复制过来的,相当于一年经验用了3年,5年;因此从这次项目开始,写代码在原来的习惯上有所改变,每次都加入一些新东西;记录一下。

这个项目现在还记得用的新技术有:

1. MyBatis的Cursor

2. stream中的parallelStream

1. MyBatis的Cursor

记得最初使用Mybatis,是将所有查询到的记录,都转换成List中的对象,逐条处理;

后来系统规模原来越大,一次查询可能返回几十万、几百万、甚至上千万条记录,如果还是采用将查询结果都转换成List中的对象,客户端的内存将可能会被撑爆,OOM;有些时候需要按照获取记录的内容进行统计处理,虽然满足条件的记录有百万、千万规模,但实际用到的可能是前面几万条(当初MySqL的统计函数性能还是不行----不过现在也没深入了解;或者有些处理模式是业务特有的场景,没有相关的统计函数支持);所以处理方式需要优化,不能再把所有符合查询条件的记录都获取到本地后再处理。

用到过的方式:

1. 一种方式就是分页查询,查询到的记录按照业务要求进行排序,每次取1000,10000条,记录下每次获取的最后一条记录的id号,如果当前页的数据不满足业务处理需要,下次接着上次最大id开始继续进行分页查询。这种方式简单,在原有的基础上变动不是很大。

可能的问题:

一是分了多次查询,可能对数据库端的性能会带来影响(这个没比较过,感觉中会有一定影响);

二是查询条件中加了一个因素:上次查询的最大id,由于加了这个条件,记得有时数据库查询,使用的索引会跑偏,来了个全表扫描;如果这个表有几千万条,几亿条记录,来一次全表扫描,系统就报警了(为了这个问题找DBA一起优化过好几次);

2. 除了分页查询,Mybatis推出的ResultHandler是更好的方式,相比上面分页方式,需要增加sql查询的条件,业务处理逻辑也需要做略微的修改,使用ResultHandler就简单多了,对查询sql语句无任何倾入性。每返回一条查询结果记录,转换成对象后,Mybatis就会调用一次ResultHandler进行处理;这个在处理流程上,感觉和最早都返回到List后再处理的方式没啥差别,最早是返回到List中,自己提供一个处理方法给到for循环(处理List中每条记录),现在只是把处理方法包装成一个类,直接将类提供给Mybatis,由Mybatis发起调用;

从3.4开始,Mybatis又提供了一个方式:Cursor,对查询返回的结果进行流式处理;Cursor是查询返回的一种Java类型,而ResultHandler方式由于在查询到数据MyBatis就调用外部类进行处理了,所以查询函数返回是void;拿到Cursor对象,就可以像以前一样逐条处理,而不用担心内存会被撑爆的情况;

注意点

不过使用Cursor有个情况要注意: 即整个查询、处理过程,要在一个事务里完成;Cursor是一点点返回数据,需要数据时才从后端加载进来(中间有优化另说),如果事务已经结束,Cursor也只能关闭,则后续Cursor无法再从后端加载数据,再使用这个Cursor就会报错了;

使用例子

虽然获取数据是一条条获取的,但是我们一般处理的时候,是获取到一定数量的记录,然后进行多线程并行批处理,使用ResultHandler,是Mybatis调用ResultHandler,我们不知道Mybatis什么时候调用结束,所以在主流程中,最后还需要判断一下作为缓存的List是否有数据存在,如果有的话,还需要最后再处理一次;

List<Record> lists = new ArrayList();
MySelfMapper.queryData(ResultHandler, XXX);
if(CollectionUtils.IsNotEmpty(lists)){
   handler(lists);
}

Cursor实现了Iterable接口,我们可以用Iterator做循环,每次获取一定数据量的记录进行并发批处理,不用担心最后List是否还有记录没处理完

Iterator iterator = cursor.iterator();
while( iterator.hasNext() ){
  for( int i = 0; i < MAX_LENGTH && iterator.hasNext(); i++) {
    list.add(iterator.next());
  }
  handler(list);
}  

不过有时从原来ResultHandler上改写代码,也会这么写:

// handlersingle中,将记录加入到list后,判断list的长度是否到该处理的时候了;
cursor.forEach( record -> {handlerSingle(record); } ); 
if( CollectionUtils.isNotEmpty(list)) {
  handler(list);
}

其他:

网上还有资料说以下使用方法,这次没使用,下次有机会再说了;

在mybatis-spring 1.3.0版本中新增加了MyBatisCursorItemReader类,需要spring-batch jar包的支持,通过MyBatisCursorItemReader我们可以对Cursor进行操作;

2. ParallelStream使用

原来大部分业务处理,都是数据库中查询,然后加载一定数量的记录到list中,交给一个线程池进行处理;

  1. 最早写法,自己搞个线程池,然后交给线程池处理,如果需要返回结果的,用Future接收一下处理结果;
  2. 后来用了Spring,Spring支持标注式异步,加上 @Asyc 标注,申明一下即可;和调用普通方法没啥差别。一样的,如果要返回结果,也是用Future接收返回;

现在stream提供了并行stream,正好使用一下;

从Java8开始,提供了Stream,以前一般都是使用串行stream,譬如在缓存路由配置时,只加载有效的路由,按照业务方编码转换成Map,方便检索;

Map<String, Route> routeMap = list.stream()
    .filter( route -> route.isEnable() )
    .collect(Collectors.groupingBy(Route::getSupportId()));

将串行的stream,改成并行的stream,非常简单,将 stream() 换成 parallelSteam()即可(这里我需要得到处理结果,成功还是失败;如果全部处理成功那就是成功;如果有一条处理失败,就要人工干预)

boolean bCalcResult = lists.parallelStream()
    .allMatch( record ->{ return dealSingle(record); });

parallelStream的线程配置:

parallelStream默认使用了fork-join框架,其默认线程数是CPU核心数;由于这里的业务处理,是耗CPU的,所以也就不用调整线程数(耗CPU的,线程数多了反而浪费时间在线程调度上 -- 这个看业务的特征了,以及需求方是要吞吐量优先,还是响应时间优先了)。不过将来有可能会使用到这个功能,也就看了一下(以前自己配置的线程,哪次没有配置过线程数?)

配置的2种方法:

1.全局设置在运行代码之前,加入如下代码: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "64"); 一般不建议修改,因为修改了这个参数,虽然改进了当前的业务逻辑,但对于整个项目中其它地方只是用来做非耗时的并行流运算,性能就不友好了,因为所有使用并行流paraparallelStream默认使用了fork-join框架,其默认线程数是CPU核心数。

在运行代码之前,加入如下代码:

System.setProperty(
  "java.util.concurrent.ForkJoinPool.common.parallelism", "20"
  );

2、代码块内部设置

自己定义的ForJoin调度,这个自己代码里没写过,就直接抄网上的代码了

ForkJoinPool forkJoinPool1 = new ForkJoinPool(20);
ForkJoinTask<Boolean> fs = forkJoinPool.submit(() -> 
  inputStream.allMatch( element -> {
    Thread.sleep(300);
    System.out.println(Thread.currentThread().getName());
    System.out.println("线程数量:" + Thread.activeCount());
    return new Random().nextInt(100) >= 0;
}));
try {
    result = fs.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e){
    e.printStackTrace();
}
forkJoinPool.shutdown();

另外一个例子:

ForkJoinPool pool = new ForkJoinPool(2);

List<Integer> list = IntStream.range(0, 20).boxed()
                      .collect(Collectors.toList());
pool.submit(() -> {
   list.parallelStream().forEach(s -> {
      // 业务处理
      System.out.println("thread:" + Thread.currentThread().getName() + "value" + s);
   });
});

注意点:

最后列举一些使用parallelStream需要注意的地方:

1.parallelStream线程不安全问题(加锁、使用线程安全的集合或者集合采用collect()或reduce()操作就是满足线程安全的;

错误例子:

List list = new ArrayList<>();        
IntStream.rangeClosed(1, 10000).parallel().forEach(i -> list.add(i));

这样就会报错,ArrayList本身就是一个线程不安全的容器,在多线程的操作下,扩容操作可能会导致产生数组越界的异常。

此时,要么使用线程安全的容器,比如Vector,要么使用collect完成串行收集

2.parallelStream 适用的场景是CPU密集型的,假如本身电脑CPU的负载很大,那还到处用并行流,那并不能起到作用,切记不要在paralelSreram操作中使用IO流;

3.不要在多线程中使用parallelStream,原因同上类似,大家都抢着CPU是没有提升效果,反而还会加大线程切换开销;

4.线程关联的ThreadLocal将会失效

其他

按照原来习惯写法,写一个service,总是要先写Interface,然后再写一个Impl类;使用时注入Interface;

Interface是为了多种实现场景时考虑的,现实情况下,没有这没多的多种实现场景,因此这次项目,大部分的service,都是直接写了Impl类,没有用interface

--- 不过对外服务的,必须要写Interface;
--- 在业务端,很明显将来会扩充的地方,必须写Interface;

为了兼容将来的扩充,@Autowired Interface的地方,把@Qualifier都加上去了,这样以后Interface有新的实现,在不需要使用到新实现的地方,就不需要修改代码了;

另外还记得生成代理类的方式所有不同:
 用Interface,spring是采用JDK动态代理的方式, 而Class,使用cglib生成继承类,因此从执行性能效率上讲,采用Class的方式性能会稍微好点,尤其是繁忙业务场景下;这个也是当初考虑的因素吧;

最后

以上就是俊秀巨人为你收集整理的202109项目总结: MyBatis Cursor + Parallel Stream使用概述1. MyBatis的Cursor2. ParallelStream使用其他的全部内容,希望文章能够帮你解决202109项目总结: MyBatis Cursor + Parallel Stream使用概述1. MyBatis的Cursor2. ParallelStream使用其他所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(40)

评论列表共有 0 条评论

立即
投稿
返回
顶部