概述
前言
近期由于项目需求所以在网上查找分布式任务调度工具,最终找到了xxl-job,遂进行研究一番进行了下文的总结。
如设计一个分布式调度平台
熟练上手xxl-job之后就需要对xxl-job进行进一步的操作以便于后续可能的改造工作。对此查看源码是最好的手段,在查看源码之前,我们不妨了解一下如何设计一个健全的分布式调度框架。
从下图我们不难看出,要设计一个分布式调度平台,首先就是确定通信方式,从下图的调度中心和执行器的交互来看,它们使用的是RPC协议。
而且我们做的是分布式调度中心,这也就意味着执行器可能不止一个,所以我们必须使用一个注册中心来管理所有注册的执行器。完成执行器注册之后,我们的调度中心就可以管理这些任务了,按照用户配置的时间定时去调度执行器,触发调度之后,触发器受到调度中心的调度就会将调度结果返回给调度中心。由于调度执行任务不一定是立刻完成的,所以我们的任务会通过异步的方式提交执行,执行完成后会通过回调的方式将调用结果告知给调度中心,调度中心回根据执行结果来决定是否进行告警服务。
由此我们不难看出,要想设计一个分布式任务调度中心,我们需要下面几个东西:
- 注册服务
- RPC通信框架
- 调度服务
- 日志服务
- 告警服务
服务端启动流程详解
整体过程概述
经过上文之后,我们大概是了解了xxl-job的设计,现在我们不妨来看看服务端(xxl-job-admin)启动流程,由于xxl-job是一个典型的spring-boot项目,按照经验我们可以在各种config中看到蛛丝马迹。
于是经过笔者的查找我们看到了XxlJobAdminConfig这个类。从类图中我们不难看出他在bean完成初始化之后通过InitializingBean进行了一些特殊的操作,对此我们不妨步入源码查看究竟。
我们步入源码看到它继承InitializingBean所实现的afterPropertiesSet方法执行了下面的操作。代码很简单创建一个调度器之后就调用init进行初始化。我们不妨查看init做了什么操作。
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
步入了init我们看到了各种helper类的启动操作。从注释中我们也可以看出这些操作分别是:
- 初始化i18n。
- JobTriggerPoolHelper这里面会完成一些线程池初始化的操作。
- 初始化注册监控相关,在这个操作里面,回每隔30秒进行一次注册表维护。
- 初始化失败处理监控器,对失败的情况进行监控,这里面会涉及一些失败发送邮箱或者重试的操作。
- 初始化任务完成器,将一些长时间没有响应的任务进行结束处理。
- 初始化报表统计,会进行一些成功失败的报表统计。
- 初始化调度器,执行任务调度处理。
public void init() throws Exception {
// init i18n
initI18n();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// admin registry monitor run
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.getInstance().start();
// admin log report start
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
初始化触发器
我们先来看看JobTriggerPoolHelper.toStart();这段代码内部的逻辑,非常简单,无非就是初始化两个线程池,一个线程池是名为快触发线程池,另一个则是慢触发线程池。
从配置参数中我们可以看到这两个线程池的区别:
- 快线程池的最大线程数默认为200,慢线程池为100。
- 快线程池最多容纳1000个任务,慢线程池默认容纳2000个任务。
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
从参数角度了解了区别之后,我们在使用上进一步了解一下两者的区别,以下便是笔者通过IDEA找到的关于线程池的核心调用出代码addTrigger,从方法名不难看出就是执行任务的方法。
从代码中可以看到如果一分钟执行超过10次的任务就会通过slowTriggerPool执行,反之就通过fastTriggerPool执行。这也是设计者执行的巧妙所在,将那些可以快速执行的任务放到快线程池中快速执行完成。而将那些耗时且频繁的任务放到慢线程池中堆着慢慢消化,合理分配避免某些快任务因为慢任务而导致执行频率低下。
public void addTrigger(......) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(......)
}
维持注册表信息详解
接下来就是JobRegistryHelper的的start方法,由于代码比较长,笔者下文介绍时会以段落的形式展示。
该方法首先会声明一个线程池,从语义上可以猜测出这个线程池是负责注册或者删除执行器的线程池。而且这个线程池的拒绝策略也很特殊,会将任务再次执行一遍。
// for registry or remove
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
这一点我们可以在调用处得以印证,可以看到往该线程池提交的任务都是关于group注册的。
后续我们又会看到这样一个守护线程,它做的事情很简单:
- 将超过90s的注册器删除。
- 从xxl_job_registry查找出更新时间大于现在+90s的执行器,即可能是最新注册的执行器,以appname作为key,相关地址作为value并将其存放到appAddressMap中。
- 从appAddressMap取出所有appName对应的地址,更新xxl_job_group执行器地址列表,组装成 addressListStr生成一个group并将其保存到xxl_job_group表中。
- 休眠30s后继续1-3的操作。
具体可参考下文注释进行阅读:
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 从xxl_job_group找到所有的注册器的信息
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor) 将超过90s的注册器删除
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// 从xxl_job_registry查找出更新时间大于现在+90s的执行器,以appname作为key,相关地址作为value并将其存放到appAddressMap中
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group: groupList) {
//从appAddressMap取出所有appName对应的地址,更新xxl_job_group执行器地址列表,组装成 addressListStr
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
//基于addressListStr生成group
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
//更新group更新时间
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
//休眠30s
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
失败管理监视器
我们再来看看JobFailMonitorHelper的start代码。代码整体流程笔者都加以注意,大体步骤为:
- 从xxl_job_log找到执行失败的任务。
- lock log 将xxl_job_log表中这些任务alarm_status设置为-1,意为上锁,如果没锁成功下次循环继续上锁。
- 从xxl_job_log获取这些job的id。
- 根据xxl_job_log的id从xxl_job_info获取到这个任务的信息。
- 查看xxl_job_info失败的任务重试次数是否大于0,大于0则继续重试执行。
- 对于失败的任务,判断info是否为空,如果不为空,则进行告警,然后基于乐观锁更新xxl_job_log告警信息。
- 休眠10s,继续1-6的操作。
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
//从xxl_job_log找到执行失败的任务
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log 将xxl_job_log表中这些任务alarm_status设置为-1,意为上锁,如果没锁成功下次循环继续
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
//从xxl_job_log获取日志信息
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
//根据xxl_job_log的id从xxl_job_info获取到这个任务的信息
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor 查看失败的任务重试次数是否大于0,大于0则继续重试
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style="color:#F39C12;" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
// 如果info不为空,则进行告警
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
//基于乐观锁更新xxl_job_log告警信息
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
任务结束处理器
我们继续前进查看JobCompleteHelper的源码。第一步也还是创建一个回调线程池,参数如下,可以看到拒绝策略任然是再次执行任务。
// for callback
callbackThreadPool = new ThreadPoolExecutor(
2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
}
});
再往后查看代码,我们会发现一个守护线程monitorThread,查看它的核心工作代码如下,笔者都已相近注释,具体步骤为:
- 找到运行中状态超过10min的任务id。
- 拿着这个任务id组装出一个log对象
- 基于这个表对象将任务结果通过updateHandleInfoAndFinish设置为结束。
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
//找到丢失的任务id
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
//基于logId组装XxlJobLog
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
//基于jobLog将任务结果结束
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
我们再来看看updateHandleInfoAndFinish的代码,逻辑也很简单,根据log对象的code值组装对应的msg到xxlJobLog中,然后更新到xxl_job_log表中。
public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {
// 根据xxlJobLog任务的code得到对应的msg并将其设置到xxlJobLog中
finishJob(xxlJobLog);
// text最大64kb 避免长度过长
if (xxlJobLog.getHandleMsg().length() > 15000) {
xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
}
// 更新xxl_job_log中这个任务的信息
return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
}
报表处理
还记得我们登录xxl-job-admin时,哪个报表界面吗?这个页面的数据就是通过JobLogReportHelper进行处理的。对此,我们不妨打开源码一探究竟。
源码如下,笔者也已经相近注释,核心逻辑为:
- 获取今天、昨天、前天的任务总数、正在运行数、成功数,得出统计信息更新到表中。
- 查看日志保留天数,如果到期则将过期日志删除。
while (!toStop) {
// 1、log-report refresh: refresh log report in 3 days
try {
for (int i = 0; i < 3; i++) {
// 获取Calendar都西昂
Calendar itemDay = Calendar.getInstance();
//查看今天-i天时的数据,按照循环3次我们可以得出回查看今天、昨天、前天的数据
itemDay.add(Calendar.DAY_OF_MONTH, -i);
itemDay.set(Calendar.HOUR_OF_DAY, 0);
itemDay.set(Calendar.MINUTE, 0);
itemDay.set(Calendar.SECOND, 0);
itemDay.set(Calendar.MILLISECOND, 0);
Date todayFrom = itemDay.getTime();
itemDay.set(Calendar.HOUR_OF_DAY, 23);
itemDay.set(Calendar.MINUTE, 59);
itemDay.set(Calendar.SECOND, 59);
itemDay.set(Calendar.MILLISECOND, 999);
Date todayTo = itemDay.getTime();
// 初始化一个xxlJobLogReport对象
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
//查出当天触发的任务数、正在运行数、成功数
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// 将上述结果更新到xxl_job_log_report表中
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
}
}
// 2、log-clean: switch open & once each day
//设置了保留日志天数且日志保留了24小时,则进入if逻辑内部
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
// 通过日志保留天数算出清除log时间
Calendar expiredDay = Calendar.getInstance();
expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
expiredDay.set(Calendar.HOUR_OF_DAY, 0);
expiredDay.set(Calendar.MINUTE, 0);
expiredDay.set(Calendar.SECOND, 0);
expiredDay.set(Calendar.MILLISECOND, 0);
Date clearBeforeTime = expiredDay.getTime();
// clean expired log
List<Long> logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
if (logIds!=null && logIds.size()>0) {
//删除过期日期数据
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds!=null && logIds.size()>0);
// update clean time
lastCleanLogTime = System.currentTimeMillis();
}
try {
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
任务调度处理器(重点)
接下来就是xxl-job工作调度的核心源码了JobScheduleHelper,我们还是分两段来查看这其中的逻辑。先来看看第一段逻辑,这段逻辑是由scheduleThread 这个守护线程处理的,它的逻辑主要是负责安排任务的执行时间的:
- 查出未来5s要执行的任务。
- 如果发现这个任务执行时间距离现在已经过期5s,则根据策略要么立即触发要么安排下次处理时间吗。
- 如果发现这个任务在过期时间小于5s要么现在立刻执行,要么安排下次一次执行时间,并将这个时间。
- 剩下的都是未过期即将被执行的任务则全部存到一个ringdata的线程安全map中,这个map以秒为key,所有这个时间点执行的任务构成的list为value。
- 将job的时间安排结果更新到xxl_job_info表中。
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
//查出可以读取的任务数,这里为6000,也就是说这个线程一次可以处理6000个任务信息
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//上写锁,然后操作xxl_job_info表
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、查出未来5s要执行的任务
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// 如果现在时间大于任务下次处理时间+5s,即当前任务到期了还没执行则进入if逻辑
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 如果任务的处理策略是FIRE_ONCE_NOW则立刻执行
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 如果任务不是要求FIRE_ONCE_NOW则更新一下下次处理的时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {//过期时间小于5s
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、则提交到线程池中等待执行
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、设置下一次处理时间
refreshNextValidTime(jobInfo, new Date());
//如果下次处理时间还在5s内再次更新一下时间
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring将任务存到ringData这个ConcurrentHashMap中
pushTimeRing(ringSecond, jobInfo.getId());
// 3、设置下次执行时间
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if (!scheduleThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
}
}
........
});
接下来就是任务调度的逻辑了,这里的处理也很简单,从上文创建的rindData取出当前时间前2s的任务,然后提交到线程池中执行,避免没必要的延迟。
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 提前2s获取要么执行的任务存到ringItemData中
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// ringItemData中的任务全部提交到线程池中执行
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
参考文献
xxl-job源码解析(看这一篇就够了,超简约且详细)
最后
以上就是欢喜板栗为你收集整理的微服务-sentinel主要特性基本使用服务注册到sentinel中流控-快速失败流控-关联配置流控设置线程数流控-预热流控排队等待源码地址的全部内容,希望文章能够帮你解决微服务-sentinel主要特性基本使用服务注册到sentinel中流控-快速失败流控-关联配置流控设置线程数流控-预热流控排队等待源码地址所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复