我是靠谱客的博主 贪玩香氛,最近开发中收集的这篇文章主要介绍xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

xxl-job源码解读:触发器线程池TriggerPool

本文基于xxl-job的2.3.1版本

基本说明

作为触发器调用的统一入口,为触发器的调用提供线程池异步处理,并根据触发时间进行线程池的区分。

在不进行源码改动的情况下,共有四个地方会调用触发器JobTriggerPoolHelper.trigger

  1. 调度器触发执行:由定时任务的触发器正常调度
  2. 页面手动触发执行:从任务信息页面,点击执行一次,手动触发执行
  3. 失败监听器触发执行:如果任务执行失败,并且任务设置了失败重试次数,会根据重试次数再次调用触发器执行
  4. 父任务成功触发执行:设置了父子任务的情况下,父任务成功后,会由XxlJobCompleter 触发调用子任务的触发器执行

触发器源码解读

JobTriggerPoolHelper 负责分配触发器线程池,并作为触发器调用的统一入口 。

类名全路径:com.xxl.job.admin.core.thread.JobTriggerPoolHelper

代码逻辑流程图

在这里插入图片描述

源码解读

调用入口方法

    /**
     * 为任务添加一个触发器
     * <p>可用于立即执行一次</p>
     *
     * @param jobId                 触发的任务ID
     * @param triggerType           添加的触发器类型
     * @param failRetryCount        >=0: use this param
     *                              <0: use param from job info config
     * @param executorShardingParam 分片执行参数
     * @param executorParam         null: use job param
     *                              not null: cover job param
     */
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }

线程池区分+超时判断

快慢线程池定义,区别在队列大小,可以通过配置修改最大线程数

    // fast/slow thread pool
    private ThreadPoolExecutor fastTriggerPool = null;
    private ThreadPoolExecutor slowTriggerPool = null;

    public void start() {
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));

        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2000),
                r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
    }

分配线程池进行执行。一分钟为周期,进行超时任务统计,高频调用的并且触发执行时间长的会被转移到慢线程池。

    // job timeout count
    private volatile long minTim = System.currentTimeMillis() / 60000;     // ms > min
    private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();


    /**
     * add trigger
     */
    public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool
        ThreadPoolExecutor triggerPool = fastTriggerPool;
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        // 同一个任务 一分钟内任务触发超500ms 十次,转入慢线程池处理
        if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool = slowTriggerPool;
        }

        // trigger
        triggerPool.execute(() -> {

            long start = System.currentTimeMillis();

            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

                // 检查时间循环,每整分钟清空一次超时触发集合
                long minTimNow = System.currentTimeMillis() / 60000;
                if (minTim != minTimNow) {
                    minTim = minTimNow;
                    jobTimeoutCountMap.clear();
                }

                // incr timeout-count-map
                long cost = System.currentTimeMillis() - start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    // 根据JobId进行统计, 任务触发超过500ms认为超时, 统计超时次数
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }

            }

        });
    }

最后

以上就是贪玩香氛为你收集整理的xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool的全部内容,希望文章能够帮你解决xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(72)

评论列表共有 0 条评论

立即
投稿
返回
顶部