我是靠谱客的博主 清爽月光,这篇文章主要介绍Collector原理解析一、collect方法使用二、collect方法的疑问三、 Collector 解析四、 Collector实现五 并发流总结六 toList()的实现也太简单了吧七:collect 源码的小探究八:总结,现在分享给大家,希望可以做个参考。

一、collect方法使用

先介绍一段代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
Person p1 = new Person("hehe", 20); Person p2 = new Person("wang", 20); Person p6 = new Person("hou", 25); Person p5 = new Person("hou", 25); Person p3 = new Person("mabi", 30); Person p4 = new Person("li", 30); List<Person> personList = Arrays.asList(p1, p2, p3, p4, p5, p6); List<String> per1 = personList.stream() .map(Person::getName).filter(String::toUpperCase).collect(toList()); --收集成集合 复制代码

这是最常用 最简单的函数式编程,经过包装后甚至类似于执行一个SQL;你只需要传入一些Predicate等 函数,就可以达到预期。 下面我们来研究下 collect(toList()) 究竟发生了什么。

二、collect方法的疑问

collect方法主要做收集使用,但并不意味着就要收集成集合,请记住这句话。

  1. collect 它是个重载方法: <R, A> R collect(Collector<? super T, A, R> collector); ->这是大多数人使用的 该方法要求传入一个 Collector ,返回一个R

<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); ->这是另一个稍微底层的API

  1. 看这个方法瞬间懵逼,Collector是什么? 范型 T ,A ,R 分别代表了什么? 但你直接使用toList(),却觉得好像理所应当(我就想把person的Name收集成个集合嘛),这也是java8中函数式编程的意义之一,非常非常容易理解代码的含义。
  • –但你是否想过,它收集成了什么集合? ArrayList 、LinkedList 又或是其他实现?

三、 Collector 解析

一切都要源于 collect(Collector<? super T, A, R> collector);中的Collector

复制代码
1
2
3
4
5
6
7
8
9
10
public interface Collector<T, A, R> { Supplier<A> supplier(); BiConsumer<A, T> accumulator(); BinaryOperator<A> combiner(); Function<A, R> finisher(); Set<Characteristics> characteristics(); //...略 } 复制代码

Collector是一个接口,其中有几个抽象方法 supplier、accumulator、combiner、finisher、characteristics,当你构造一个它的实现类,传给collec(),就会按照你的要求来进行收集!

  1. Supplier<A> supplier()提供一个函数式接口 T get();
  • 就是生产者,我们需要给Collector提供一个容器来存放Stream中的元素。
  • Supplier s1 = HashSet::new
  • Supplier s2 = ArrayList::new
    类比:
  • StringBuilder::new, 创建一个StringBuilder来收集CharSequence 我们完成了容器初步的搭建
  • 思考:并发流会创建多个容器吗?

2.BiConsumer<A, T> accumulator() 提供一个计算器, void accept(T t, U u);

  • 他是一个消费者,而且还是二元消费者,需要提供 T U 它进行消费; 它的意义是:每次stream中的新元素传来,需要怎么将它归入supplier创建好的容器中。 比如:
  • List::add ->将元素加入List
  • StringBuilder::append -〉将CharSequence加入StringBuilder 我们完成了流中元素和容器间的关系
  • 思考:并发流会怎么执行呢?

  1. Function<A, R> finisher() 提供一个最终的返回结果
  • A 就是supplier中提供的容器, R是要最终返回的,A=R也是完全可以的。 比如说:
  • Function.identity(); 会直接返回你传入的 return t -> t;
  • 将List -> Set; List -> Map 都是OK的,完全看你想怎么把容器A返回。

4.Set<Characteristics> characteristics(); 这是一个声明: 1、排序问题UNORDERED,集合中是否按照顺序排呀? 2、并发问题CONCURRENT,可以支持多个线程操作(accumulator)同一个 A容器 3、容器和最终返回结果是否一致IDENTITY_FINISH finisher()中,A==R ?


  1. BinaryOperator<A> combiner() 提供一个融合器,这个是用在并发流中的;
  • BinaryOperator 是个二元处理器,接受 2个参数,返回1个,R apply(T t, U u) 这个是严格的
  • 来看看我的使用方法吧:
  • (List left, List right) -> { left.addAll(right); ->这是集合的addALl方法,将两个集合合二为一
  • (StringBuiler r1, StringBuiler r2) -> { r1.append(r2); return r1; } ->这是将两个StringBuiler拼接,返回其中一个
  • stream(),其实有个兄弟,叫stream().parallel() 或者 parallelStream(); 他们可以使得程序并发对stream进行处理,默认是看你cpu的核数 + cpu的超线程技术的总和;
  • 1、如果使用了并发流,那么每个线程将调用一次supplier,创建一个A, 每个线程都会去执行accumulator()把流中自己获取到的元素,加入自己的容器A。
  • 2、如果使用串行流,那么只会创建一个容器A;单线程accumulator。
  • 3、如果开启了parallelStream ,characteristics声明了CONCURRENT,那么不得了,程序将支持多个线程操作同一个容器A,A只会创建一个,这就直接导致了线程不安全问题:如果A是一个List,一个线程在遍历集合,另一个在添加元素,会抛出并发修改异常,如果同时都在set元素,会产生覆盖效果,此时,你提供的容器A,必须是线程安全的集合!! StringBuilder 就应该换成StringBuffer。 (请注意:此时就不会进行combiner,毕竟只有一个容器A)
  • 4、如果开启了parallelStream characteristics没声明CONCURRENT,那么就会使线程隔离,每个线程创建一个容器A,自己accumulate,再进行combiner,将多个容器融合(串行融合)。
  • 5、如果使用了 IDENTITY_FINISH 那么程序根本不会执行finisher,因为认为 A == R,直接返回A就得了,没必要执行。

四、 Collector实现

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class Collector_custom { public static void main(String[] args) { Set<Integer> collect = Arrays.asList(1, 2, 3, 3, 4, 5, 6).stream().collect(new MyCollector<Integer>()); System.out.println(collect); } public static class MyCollector<T> implements Collector<T, Set<T>, Set<T>> { @Override public Supplier<Set<T>> supplier() { System.out.println("MyCollector.supplier"); return HashSet::new; -->我们提供一个HashSet } @Override public BiConsumer<Set<T>, T> accumulator() { System.out.println("MyCollector.accumulator"); return Set::add; -->我们处理Set 和流中元素T的关系 } @Override public BinaryOperator<Set<T>> combiner() { System.out.println("MyCollector.combiner"); return (st1, st2) -> { st1.addAll(st2); return st1; ->如果是并发流,创建了多个容器,我们处理多个容器间的关系 }; } @Override public Function<Set<T>, Set<T>> finisher() { System.out.println("MyCollector.finisher"); return Function.identity(); -> 处理 容器和最终返回的规约,我们选择都是返回Set<T> } @Override public Set<Characteristics> characteristics() { System.out.println("MyCollector.characteristics"); return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, UNORDERED)); --> 当我们使用了 IDENTITY_FINISH ,其实就不用再写finisher();不知道你明不明白? --> UNORDERED 不追求顺序,我们毕竟用的HashSet } } } 复制代码

java8自己也有实现: 和我们实现的有点区别,我们已经对5个参数实现好了, 它是需要用户自己传,这就是函数式编程!

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static class CollectorImpl<T, A, R> implements Collector<T, A, R> { private final Supplier<A> supplier; private final BiConsumer<A, T> accumulator; private final BinaryOperator<A> combiner; private final Function<A, R> finisher; private final Set<Characteristics> characteristics; //自己传入 5个参数, CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A,R> finisher, Set<Characteristics> characteristics) { this.supplier = supplier; this.accumulator = accumulator; this.combiner = combiner; this.finisher = finisher; this.characteristics = characteristics; } CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Set<Characteristics> characteristics) { this(supplier, accumulator, combiner, castingIdentity(), characteristics); } @Override public BiConsumer<A, T> accumulator() { return accumulator; } @Override public Supplier<A> supplier() { return supplier; } @Override public BinaryOperator<A> combiner() { return combiner; } @Override public Function<A, R> finisher() { return finisher; } @Override public Set<Characteristics> characteristics() { return characteristics; } } 复制代码

五 并发流总结

  • 如果是stream(),串行流,那么supplier创建一次容器A, accumulator 对每个元素累加,N个元素N次,combiner不执行,finisher主要看是否使用IDENTITY_FINISH;
  • 如果是parallelStream(),并行,主要看是否开启CONCURRENT,此枚举支持多个线程操作同一个容器A, 有可能导致并发安全问题 ;如果不开启,则每个线程创建一个容器A,自己对自己accumulate,最后再combiner,很安全。

六 toList()的实现也太简单了吧

复制代码
1
2
3
4
5
6
7
8
9
10
11
public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); //用了内置的静态类CollectorImpl,传入了ArrayList::new,List::add、(left, right) -> { left.addAll(right); return left; //CH_ID 就是一个实现好了的Set枚举集合 } 复制代码

七:collect 源码的小探究

1、finisher()执行的时机

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { //假设我们是串行 container = evaluate(ReduceOps.makeRef(collector)); } //这里看到了IDENTITY_FINISH的作用吗? 如果有就返回 container,没有才去finisher() return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); } 复制代码

2、其他方法的执行时机 ->ReduceOps.makeRef(collector)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
makeRef(Collector<? super T, I, ?> collector) { //执行了! Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box<I> implements AccumulatingSink<T, I, ReducingSink> { @Override public void begin(long size) { state = supplier.get(); } @Override public void accept(T t) { accumulator.accept(state, t); } @Override public void combine(ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink() { return new ReducingSink(); } @Override public int getOpFlags() { //这里对UNORDERED 进行判断 return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0; } }; } 复制代码

八:总结

熟悉了Collector后,看Collectors的所有方法,都会非常简单。

最后

以上就是清爽月光最近收集整理的关于Collector原理解析一、collect方法使用二、collect方法的疑问三、 Collector 解析四、 Collector实现五 并发流总结六 toList()的实现也太简单了吧七:collect 源码的小探究八:总结的全部内容,更多相关Collector原理解析一、collect方法使用二、collect方法的疑问三、内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(78)

评论列表共有 0 条评论

立即
投稿
返回
顶部