我是靠谱客的博主 搞怪手机,最近开发中收集的这篇文章主要介绍Java并发学习笔记20 线程池 ForkJoinPool,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

bilibili-Java并发学习笔记20 线程池 ForkJoinPool

基于 java 1.8.0

P64_ForkJoinPool原理与构造方式详解

  1. 分而治之
  2. 工作窃取

适合 CPU 密集型计算任务,不适合 IO 密集型任务

    /**
     * @param parallelism 并行度级别 默认值 = java.lang.Runtime#availableProcessors
     * @param factory  创建新线程的工厂 默认值 = defaultForkJoinWorkerThreadFactory
     * @param handler  由于执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序 默认值为 null
     * @param asyncMode
     *      true  为从未连接的分叉任务建立本地先进先出调度模式。
                在工作线程只处理事件式异步任务的应用程序中,此模式可能比默认的基于本地堆栈的模式更合适。
     *      默认值 = false
     * @throws IllegalArgumentException 如果并行度小于或等于零,或大于实现限制
     * @throws NullPointerException if the factory is null
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

P65_ForkJoinTask详解与示例分析

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    V result;
    protected abstract V compute();
}

public abstract class RecursiveAction extends ForkJoinTask<Void> {
    protected abstract void compute();
}
    /**
     * Submits a ForkJoinTask for execution.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }

ForkJoinPool 示例

package new_package.thread.p64;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTest {

    public static void main(String[] args) {
        //ForkJoinPool forkJoinPool = new ForkJoinPool(2);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        SumTask sumTask = new SumTask(1, 100, 10);
        System.out.println(forkJoinPool.invoke(sumTask));
        forkJoinPool.shutdown();
    }
}

class SumTask extends RecursiveTask<Integer> {

    int limit;
    int start;
    int end;

    public SumTask(int start, int end, int limit) {
        this.start = start;
        this.end = end;
        this.limit = limit;
    }

    @Override
    protected Integer compute() {

        int sum = 0;
        if ((end - start) <= limit) {
            System.out.println(Thread.currentThread().getName());
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }

        // Fork 步骤
        int mid = (end + start) / 2;
        SumTask leftTask = new SumTask(start, mid, limit);
        SumTask rightTask = new SumTask(mid + 1, end, limit);

        // Join 步骤
        invokeAll(leftTask, rightTask);
        Integer leftResult = leftTask.join();
        Integer rightResult = rightTask.join();
        return leftResult + rightResult;
    }
}

P66_CompletionService源码详解与示例剖析

// jdk 中 CompletionService 的唯一实现
public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
    // ...
}

package new_package.thread.p66;

import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
 * 以任务完成顺序获取到最后的结果集合
 */
public class CompletionServiceTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = new ThreadPoolExecutor(10,
                10,
                1,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue(),
                (r, executor) -> {

                });
        CompletionService<Integer> completionService = new ExecutorCompletionService(executorService);

        IntStream.range(0, 10).forEach(r -> {
            completionService.submit(() -> {
                try {
                    Thread.sleep((long) (Math.random() * 3000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
                return r + 1;
            });
        });

        for (int i = 0; i < 10; i++) {
            System.out.println(completionService.take().get());
        }

        executorService.shutdown();
    }
}

P67_ThreadLocalRandom在多线程竞争环境下的实现策略

  1. Random
import java.util.Random;

public class RandomTest {

    public static void main(String[] args) {

        Random random = new Random();
        System.out.println(random.nextInt(10));
    }
}
  • 多线程性能问题
    • 多线程同时操作种子更新,产生竞争(自旋锁)
  • 正确性没有问题,只是在多线程高并发情况下效率低下
  1. ThreadLocalRandom
import java.util.concurrent.ThreadLocalRandom;

public class ThreadLocalRandomTest {

    public static void main(String[] args) {
        ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
        System.out.println(threadLocalRandom.nextInt(10));
    }
}
  • 解决随机数生成在多线程高并发情况下效率低的问题
  • 在并发代码中,随机数生成使用 ThreadLocalRandom 较好

  • 随机数生成器
    • 随机数生成器种子
    • 随机数生成算法
  • 对于 ThreadLocalRandom 来说,其随机器生成器的种子存放在每个的线程的 ThreadLocal 中
    • Random 是共享同一个种子

ThreadLocalRandom.java

    /**
     * Returns the {@link #current() current} thread's {@code ThreadLocalRandom}.
     * @return the {@link #current() current} thread's {@code ThreadLocalRandom}
     */
    private Object readResolve() {
        return current();
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long SEED;
    private static final long PROBE;
    private static final long SECONDARY;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> tk = Thread.class;
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

Thread.java

    // The following three initially uninitialized fields are exclusively
    // managed by class java.util.concurrent.ThreadLocalRandom. These
    // fields are used to build the high-performance PRNGs in the
    // concurrent code, and we can not risk accidental false sharing.
    // Hence, the fields are isolated with @Contended.

    /** The current seed for a ThreadLocalRandom */
    @sun.misc.Contended("tlr")
    long threadLocalRandomSeed;

    /** Probe hash value; nonzero if threadLocalRandomSeed initialized */
    @sun.misc.Contended("tlr")
    int threadLocalRandomProbe;

    /** Secondary seed isolated from public ThreadLocalRandom sequence */
    @sun.misc.Contended("tlr")
    int threadLocalRandomSecondarySeed;

最后

以上就是搞怪手机为你收集整理的Java并发学习笔记20 线程池 ForkJoinPool的全部内容,希望文章能够帮你解决Java并发学习笔记20 线程池 ForkJoinPool所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部