概述
第7章 并行数据处理与性能
7.1 并行流
- steam.sequential()
- steam.parallel()
- 最后一次调用会影响整个流水线。上面两个函数只是修改并行标志。
配置并行流使用的线程池
ForkJoinPool
Runtime.getRuntime().availableProcessors()
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", 12);
7.1.4 高效使用并行流
LongStream.rangeClosed
- 避免装箱消耗
- 生成数字范围,容易拆分独立小块,支持并行。
- 测量
- 留意装箱。
- 有些操作本身在并行流上的性能比顺序流差。例如:limit、findFirst
- 考虑背后的数据结构是否容易分解。LongStream.range工厂比stream.iterator容易分解。
- 终端操作中合并步骤的代价大小。例如:Collector中的combiner方法
流的数据源的可分解行
源 | 可分解性 |
---|---|
ArrayList | 极佳 |
LinkedList | 差 |
IntStream.range | 极佳 |
Stream.iterate | 差 |
HashSet | 好 |
TreeSet | 好 |
7.2 分支/合并框架
是ExecutorService接口的一个实现,他把子任务分给线程池(ForkJoinPool)中的工作线程
7.2.1 使用RecursiveTask
分治算法
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
分支合并框架执行并行求和案例[P151]
- fork和join
- compute
7.2.2 使用分支/合并框架的最佳做法
- 调用join方法会阻塞调用方,因此,有必要在两个子任务计算都开始后再调用join。
- 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法,直接使用compute和fork方法。只有顺序代码应该用invoke来启动并行计算
- 两个子任务一个用fork,一个用compute比较好。
- 调试:调用compute的线程并不是调用方,是调用fork的那个
- 多核处理器上使用分支/合并框架不一定比顺序计算快。
- 必须选择一个标准,来决定任务是否要进一步拆分还是已小到可以顺序求值。
7.2.3 工作窃取
- 每个线程从队列头上取任务。
- 空闲线程每次随机从别的线程尾巴上窃取一个任务
ForkJoinPoll源码分析工作执行流程:
- ForkJoinWorkerThread.run调用ForkJoinPool.runWorker
- ForkJoinPool.runWorker调用ForkJoinPool.scan
- ForkJoinPool.scan调用WorkQueue.topLevelExec
- WorkQueue.topLevelExec循环调用ForkJoinTask.doExec和WorkQueue.poll直到没有任务
7.3 Spliterator
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
public static final int ORDERED = 0x00000010;
public static final int DISTINCT = 0x00000001;
public static final int SORTED = 0x00000004;
public static final int SIZED = 0x00000040;
public static final int NONNULL = 0x00000100;
public static final int IMMUTABLE = 0x00000400;
public static final int CONCURRENT = 0x00001000;
public static final int SUBSIZED = 0x00004000;
7.3.1 拆分过程
7.3.2 实现自己的Spliterator[P157]
- 以函数风格重写单词计数器
String<Character> stream = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);
- 让WorldCounter并行工作
private int countWorld(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new wordCounter(0, true),
WorldCounter::accumulate,
WorldCounter::combine);
}
countWorld(stream.parallel())
- 运用WorldCounterSpliterator
Spliterator<Character> spliterator = new WorldCounterSpliterator(SENTENCE);
Stream<Charater> stream = StreamSupport.stream(spliterator, true);
countWorld(stream);
- 延迟绑定
最后
以上就是唠叨曲奇为你收集整理的第7章 并行数据处理与性能第7章 并行数据处理与性能的全部内容,希望文章能够帮你解决第7章 并行数据处理与性能第7章 并行数据处理与性能所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复