我是靠谱客的博主 贪玩香氛,最近开发中收集的这篇文章主要介绍xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
xxl-job源码解读:触发器线程池TriggerPool
本文基于xxl-job的2.3.1版本
基本说明
作为触发器调用的统一入口,为触发器的调用提供线程池异步处理,并根据触发时间进行线程池的区分。
在不进行源码改动的情况下,共有四个地方会调用触发器JobTriggerPoolHelper.trigger
- 调度器触发执行:由定时任务的触发器正常调度
- 页面手动触发执行:从任务信息页面,点击执行一次,手动触发执行
- 失败监听器触发执行:如果任务执行失败,并且任务设置了失败重试次数,会根据重试次数再次调用触发器执行
- 父任务成功触发执行:设置了父子任务的情况下,父任务成功后,会由
XxlJobCompleter
触发调用子任务的触发器执行
触发器源码解读
JobTriggerPoolHelper
负责分配触发器线程池,并作为触发器调用的统一入口 。
类名全路径:com.xxl.job.admin.core.thread.JobTriggerPoolHelper
代码逻辑流程图
源码解读
调用入口方法:
/**
* 为任务添加一个触发器
* <p>可用于立即执行一次</p>
*
* @param jobId 触发的任务ID
* @param triggerType 添加的触发器类型
* @param failRetryCount >=0: use this param
* <0: use param from job info config
* @param executorShardingParam 分片执行参数
* @param executorParam null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
线程池区分+超时判断 :
快慢线程池定义,区别在队列大小,可以通过配置修改最大线程数
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start() {
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()));
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2000),
r -> new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()));
}
分配线程池进行执行。一分钟为周期,进行超时任务统计,高频调用的并且触发执行时间长的会被转移到慢线程池。
// job timeout count
private volatile long minTim = System.currentTimeMillis() / 60000; // ms > min
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
/**
* add trigger
*/
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 同一个任务 一分钟内任务触发超500ms 十次,转入慢线程池处理
if (jobTimeoutCount != null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool = slowTriggerPool;
}
// trigger
triggerPool.execute(() -> {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 检查时间循环,每整分钟清空一次超时触发集合
long minTimNow = System.currentTimeMillis() / 60000;
if (minTim != minTimNow) {
minTim = minTimNow;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis() - start;
if (cost > 500) { // ob-timeout threshold 500ms
// 根据JobId进行统计, 任务触发超过500ms认为超时, 统计超时次数
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
});
}
最后
以上就是贪玩香氛为你收集整理的xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool的全部内容,希望文章能够帮你解决xxl-job源码解读:触发器线程池TriggerPoolxxl-job源码解读:触发器线程池TriggerPool所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复