概述
无业务信息,只是问题记录积累。
问题
Dolphinscheduler某个调度任务,每个小时05分执行一次,但是凌晨3点05分的任务没有没有执行。之后的定时任务正常执行,偶发。
这个问题,网上也有一些解释和说明,但是我没太看懂。还是自己先弄清源码,这样下次遇到不会一样迷糊。
问题分析
看到DS日志,3点06分报如下日志,有4个触发器没有在计划时间“开火”。注意,下面对于MissFire Job 都称为“哑火作业”,这样更好理解。
Handling 4 trigger(s) that missed their scheduled fire-time
问题解决
修改线程数
修改quartz.properties,增加工作线程数,这个增加的值,可以参考服务器CPU的虚拟核数,甚至可以比虚拟核数大一些。
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 3 <- 修改这里
org.quartz.threadPool.threadPriority: 5
修改失败策略(哑火策略)
原来对于这种哑火作业,默认的策略是对已经错过的调度计划啥都不做,直接跳过,继续按调度周期执行。
但是我们的场景需要对于这种情况,能在服务器发现的时候,即上面的日志里面,3:06分打印“Handling 4 trigger(s) that missed their scheduled fire-time”的时候,可以重新调那些错过时间的任务。
Quartz对于CronTrigger有四个策略,我们选择MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY这个策略,即值为-1,就能实现重新调度器错过时间的任务,并且不错过任务。
MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY(-1)
MISFIRE_INSTRUCTION_SMART_POLICY(0)
MISFIRE_INSTRUCTION_FIRE_ONCE_NOW(1)
MISFIRE_INSTRUCTION_DO_NOTHING(2)
修改如下:
package org.apache.dolphinscheduler.service.quartz;
public class QuartzExecutors {
...
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {
...
CronTrigger cronTrigger = newTrigger()
.withIdentity(triggerKey)
.startAt(DateUtils.getTimezoneDate(startDate, timezoneId))
.endAt(DateUtils.getTimezoneDate(endDate, timezoneId))
.withSchedule(
cronSchedule(cronExpression)
//将withMisfireHandlingInstructionDoNothin改成withMisfireHandlingInstructionIgnoreMisfires
//.withMisfireHandlingInstructionIgnoreMisfires()
.withMisfireHandlingInstructionFireAndProceed()
.inTimeZone(DateUtils.getTimezone(timezoneId))
)
.forJob(jobDetail).build();
为什么改这里能实现对哑火作业的重新触发呢?下面分三个方向去说明:
正常Job的判断逻辑
QuartzSchedulerThread主线程,run函数,在拿到资源锁同时没有暂停信号的情况下,会通过一个while循环不断去库里面扫描合适时间的Job,主要是通过qsRsrcs.getJobStore()拿到JobStoreSupport对象,并调用该对象的acquireNextTriggers函数,记录下这个函数调用的入参:
入参一:now + idleWaitTime,即当前时间+空闲等待时间,默认这个空闲等待时间是0,所以第一个入参是传的当前时间。
入参二:Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()),取可用线程数和配置的batch数的最小值。
入参三:qsRsrcs.getBatchTimeWindow(),给一个窗口时间,默认是0。
@Override
public void run() {
...
while (!halted.get()) {
if(availThreadCount > 0) {//这里如果可用线程数为0,就不会再去扫描了。
List<OperableTrigger> triggers;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
//这里是获取运行正常作业的地方
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
} catch (JobPersistenceException jpe) {
...
}
...
}
跳转到JobStoreSupport对象,调用acquireNextTriggers函数,获取下一批要执行的Trigger,在这个函数中,进一步调用了getDelegate().selectTriggerToAcquire,获取合适的Trigger。根据上面分析的参数,可以知道:
noLaterThan这个参数是当前时间。
maxCount是可用线程数和批次大小的最小值。
timeWindow是0S。
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
}
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException {
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
...
}
}
}
getDelegate()获得StdJDBCDelegate对象,调用selectTriggerToAcquire,先看传入的参数:
参数1,Connection对象
参数2,noLaterThan + timeWindow,因为timeWindow为0,所以最后的noLaterThan就是当前时间。
参数3,getMisfireTime(),当前时间减去阈值时间的,阈值默认是1分钟,那么getMisfireTime得到的时间就是当前时间的前一分钟。
package org.quartz.impl.jdbcjobstore;
public abstract class JobStoreSupport implements JobStore, Constants {
protected Class<? extends DriverDelegate> delegateClass = StdJDBCDelegate.class
protected long getMisfireTime() { //获取用于对比哑火作业的时间
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
return (misfireTime > 0) ? misfireTime : 0;
}
}
参数4,可用线程数和批次大小的最小值。
这些参数传入下面的selectTriggerToAcquire函数后,拼接成了扫描SQL,获取合适的Trigger,看看他的SQL怎么写的,三个与条件分别是:
判断TRIGGER_STATE为WAITING
NEXT_FIRE_TIME<=noLaterThan,也就是当前时间
MISFIRE_INSTRUCTION哑火策略为-1或者哑火策略不为-1但是NEXT_FIRE_TIME要>=getMisfireTime(),也就是当前时间的前一分钟(就是没有额外进行配置的情况下)。
String SELECT_NEXT_TRIGGER_TO_ACQUIRE = "SELECT "
+ COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + ", "
+ COL_NEXT_FIRE_TIME + ", " + COL_PRIORITY + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
+ " AND " + COL_TRIGGER_STATE + " = ? AND " + COL_NEXT_FIRE_TIME + " <= ? "
+ "AND (" + COL_MISFIRE_INSTRUCTION + " = -1 OR (" +COL_MISFIRE_INSTRUCTION+ " <> -1 AND "+ COL_NEXT_FIRE_TIME + " >= ?)) "
+ "ORDER BY "+ COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";
public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
try {
...
ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));
// Set max rows to retrieve
if (maxCount < 1)
maxCount = 1; // we want at least one trigger back.
ps.setMaxRows(maxCount);
ps.setFetchSize(maxCount);
ps.setString(1, STATE_WAITING);
ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
rs = ps.executeQuery();
...
}
...
}
从上面,可以总结出正常的作业就是通过扫描Trigger表,获取符合两种情况的Triger:
1、策略为-1,即MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY情况下,只要NEXT_FIRE_TIME小于当前时间的,就会被扫描出来,正常执行,相当于没有哑火处理的策略了。
2、策略不为-1的时候,必须要NEXT_FIRE_TIME小于当前时间,并大于哑火作业的时间阈值的作业,才会被扫描出来;而小于哑火作业时间阈值的作业,就会被当作哑火作业,抛到MisfireHandler线程执行。
MissFire Job判断逻辑
MisfireHandler线程,定时执行doRecoverMisfires方法,从QRTZ_TRIGGERS表中,获取哑火作业。
doRecoverMisfires方法中,调用recoverMisfiredJobs方法,获取哑火作业。
recoverMisfiredJobs方法中,通过getDelegate()获取操作数据库的一个对象实例,返回的是StdJDBCDelegate对象实例,然后调用改对象的hasMisfiredTriggersInState方法,传入一些参数用来筛选哑火作业。
其中getMisfireTime方法其实就是获取了当前时间减去阈值时间的一个对比时间,阈值默认是1分钟,那么getMisfireTime得到的时间就是当前时间的前一分钟。
package org.quartz.impl.jdbcjobstore;
public abstract class JobStoreSupport implements JobStore, Constants {
protected Class<? extends DriverDelegate> delegateClass = StdJDBCDelegate.class
protected long getMisfireTime() { //获取用于对比哑火作业的时间
long misfireTime = System.currentTimeMillis();
if (getMisfireThreshold() > 0) {
misfireTime -= getMisfireThreshold();
}
return (misfireTime > 0) ? misfireTime : 0;
}
protected RecoverMisfiredJobsResult recoverMisfiredJobs(
...
boolean hasMoreMisfiredTriggers =
getDelegate().hasMisfiredTriggersInState( //步骤3
conn, STATE_WAITING, getMisfireTime(),
maxMisfiresToHandleAtATime, misfiredTriggers);
...
}
protected RecoverMisfiredJobsResult doRecoverMisfires() throws JobPersistenceException {
...
transOwner = getLockHandler().obtainLock(conn, LOCK_TRIGGER_ACCESS);
result = recoverMisfiredJobs(conn, false); //步骤2
...
}
class MisfireHandler extends Thread {
private RecoverMisfiredJobsResult manage() {
...
RecoverMisfiredJobsResult res = doRecoverMisfires(); //步骤1
...
}
@Override
public void run() {
while (!shutdown) {
long sTime = System.currentTimeMillis();
RecoverMisfiredJobsResult recoverMisfiredJobsResult = manage();
...
}
}
}
}
hasMisfiredTriggersInState方法中,其实就是连接数据库,执行了“SELECT_HAS_MISFIRED_TRIGGERS_IN_STATE”语句,筛选出哑火作业。从SQL中可以看到筛选逻辑,其实就是同时满足下面三个条件:
哑火策略(MISFIRE_INSTR字段)不为“ Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY”,看名字也知道,就是筛选策略不是IGNORE(忽略哑火策略的)。
计划中的下次触发时间(NEXT_FIRE_TIME字段)小于第三步中得getMisfireTime,就是原定的触发时间,比当前时间前一分钟还要早的。
状态(TRIGGER_STATE字段)为Waiting的。
满足这三个条件的作业,就会被筛选成哑火作业,就是本该已经触发的作业,但是没有被触发。
package org.quartz.impl.jdbcjobstore;
public class StdJDBCDelegate implements DriverDelegate, StdJDBCConstants {
public boolean hasMisfiredTriggersInState(Connection conn, String state1,
long ts, int count, List<TriggerKey> resultList) throws SQLException {
PreparedStatement ps = null;
ResultSet rs = null;
try {
ps = conn.prepareStatement(rtp(SELECT_HAS_MISFIRED_TRIGGERS_IN_STATE)); //步骤4
ps.setBigDecimal(1, new BigDecimal(String.valueOf(ts)));
ps.setString(2, state1);
rs = ps.executeQuery();
...
}
public interface StdJDBCConstants extends Constants { //筛选SQL
String SELECT_HAS_MISFIRED_TRIGGERS_IN_STATE = "SELECT "
+ COL_TRIGGER_NAME + ", " + COL_TRIGGER_GROUP + " FROM "
+ TABLE_PREFIX_SUBST + TABLE_TRIGGERS + " WHERE "
+ COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND NOT ("
+ COL_MISFIRE_INSTRUCTION + " = " + Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY + ") AND "
+ COL_NEXT_FIRE_TIME + " < ? "
+ "AND " + COL_TRIGGER_STATE + " = ? "
+ "ORDER BY " + COL_NEXT_FIRE_TIME + " ASC, " + COL_PRIORITY + " DESC";
NEXT_FIRE_TIME更新策略
从上面的分析可以看出NEXT_FIRE_TIME这个字段是用于区分逻辑的关键字段,下面跟踪一下NEXT_FIRE_TIME更新策略。
根据上面的JobStoreSupport.acquireNextTrigger,获取到要运行的Trigger后,会把trigger抛出去执行,这一步是异步的。在这一步之后就会把NEXT_FIRE_TIME持久化到库里。
package org.quartz.impl.jdbcjobstore;
public abstract class JobStoreSupport implements JobStore, Constants {
protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
do {
currentLoopCount ++;
try {
List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);
...
for(TriggerKey triggerKey: keys) {
那什么时候去修改了NEXT_FIRE_TIME在内存里的值呢。
主要看CronTriggerImpl中的updateAfterMisfire,可以看到:
如果策略是MISFIRE_INSTRUCTION_FIRE_ONCE_NOW,就把NEXT_FIRE_TIME字段设置为当前时间,即下次触发时间设置为当前,那么当这个作业为发现是哑火的时候,就会马上执行一次,接下来再进入正常的调度流程。
如果策略是MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY,就啥也不做,不更新,留着旧的时间在库里,等到有资源的时候,自然就会重新调度起来,这个就能解决问题了。
package org.quartz.impl.triggers;
public class CronTriggerImpl extends AbstractTrigger<CronTrigger> implements CronTrigger, CoreTrigger {
@Override
public void updateAfterMisfire(org.quartz.Calendar cal) {
int instr = getMisfireInstruction();
if(instr == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY)
return;
if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {
instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;
}
if (instr == MISFIRE_INSTRUCTION_DO_NOTHING) {
Date newFireTime = getFireTimeAfter(new Date());
while (newFireTime != null && cal != null
&& !cal.isTimeIncluded(newFireTime.getTime())) {
newFireTime = getFireTimeAfter(newFireTime);
}
setNextFireTime(newFireTime);
} else if (instr == MISFIRE_INSTRUCTION_FIRE_ONCE_NOW) {
setNextFireTime(new Date()); //设置下次触发时间为当前时间。
}
}
最后
以上就是直率砖头为你收集整理的Dolphinscheduler定时调度没有启动Handling 4 trigger(s) that missed their scheduled fire-time问题问题分析问题解决的全部内容,希望文章能够帮你解决Dolphinscheduler定时调度没有启动Handling 4 trigger(s) that missed their scheduled fire-time问题问题分析问题解决所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复