我是靠谱客的博主 唠叨曲奇,最近开发中收集的这篇文章主要介绍第7章 并行数据处理与性能第7章 并行数据处理与性能,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

第7章 并行数据处理与性能

7.1 并行流

  • steam.sequential()
  • steam.parallel()
  1. 最后一次调用会影响整个流水线。上面两个函数只是修改并行标志。

配置并行流使用的线程池

ForkJoinPool

Runtime.getRuntime().availableProcessors()

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", 12);

7.1.4 高效使用并行流

LongStream.rangeClosed

  1. 避免装箱消耗
  2. 生成数字范围,容易拆分独立小块,支持并行。
  1. 测量
  2. 留意装箱。
  3. 有些操作本身在并行流上的性能比顺序流差。例如:limit、findFirst
  4. 考虑背后的数据结构是否容易分解。LongStream.range工厂比stream.iterator容易分解。
  5. 终端操作中合并步骤的代价大小。例如:Collector中的combiner方法

流的数据源的可分解行

可分解性
ArrayList极佳
LinkedList
IntStream.range极佳
Stream.iterate
HashSet
TreeSet

7.2 分支/合并框架

是ExecutorService接口的一个实现,他把子任务分给线程池(ForkJoinPool)中的工作线程

7.2.1 使用RecursiveTask

分治算法

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

分支合并框架执行并行求和案例[P151]

  • fork和join
  • compute

7.2.2 使用分支/合并框架的最佳做法

  • 调用join方法会阻塞调用方,因此,有必要在两个子任务计算都开始后再调用join。
  • 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法,直接使用compute和fork方法。只有顺序代码应该用invoke来启动并行计算
  • 两个子任务一个用fork,一个用compute比较好。
  • 调试:调用compute的线程并不是调用方,是调用fork的那个
  • 多核处理器上使用分支/合并框架不一定比顺序计算快。
  • 必须选择一个标准,来决定任务是否要进一步拆分还是已小到可以顺序求值。

7.2.3 工作窃取

  • 每个线程从队列头上取任务。
  • 空闲线程每次随机从别的线程尾巴上窃取一个任务

ForkJoinPoll源码分析工作执行流程:

  1. ForkJoinWorkerThread.run调用ForkJoinPool.runWorker
  2. ForkJoinPool.runWorker调用ForkJoinPool.scan
  3. ForkJoinPool.scan调用WorkQueue.topLevelExec
  4. WorkQueue.topLevelExec循环调用ForkJoinTask.doExec和WorkQueue.poll直到没有任务

7.3 Spliterator

public interface Spliterator<T> {

    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();

    public static final int ORDERED    = 0x00000010;

    public static final int DISTINCT   = 0x00000001;

    public static final int SORTED     = 0x00000004;

    public static final int SIZED      = 0x00000040;

    public static final int NONNULL    = 0x00000100;

    public static final int IMMUTABLE  = 0x00000400;

    public static final int CONCURRENT = 0x00001000;

    public static final int SUBSIZED = 0x00004000;

7.3.1 拆分过程

7.3.2 实现自己的Spliterator[P157]

  1. 以函数风格重写单词计数器
String<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);
  1. 让WorldCounter并行工作
private int countWorld(Stream<Character> stream) {
    WordCounter wordCounter = stream.reduce(new wordCounter(0, true),
                                            WorldCounter::accumulate,
                                            WorldCounter::combine);
}

countWorld(stream.parallel())
  1. 运用WorldCounterSpliterator
Spliterator<Character> spliterator = new WorldCounterSpliterator(SENTENCE);
Stream<Charater> stream = StreamSupport.stream(spliterator, true);
countWorld(stream);
  • 延迟绑定

最后

以上就是唠叨曲奇为你收集整理的第7章 并行数据处理与性能第7章 并行数据处理与性能的全部内容,希望文章能够帮你解决第7章 并行数据处理与性能第7章 并行数据处理与性能所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部