概述
上一篇创建完Scheduler
后,下一步就是绑定关联一个JobDetail
和Trigger
接着看一下这个方法:
scheduler.scheduleJob(jobDetail, trigger);
首先,Scheduler
有3个实现类:StdScheduler
、RemoteScheduler
和RemoteMBeanScheduler
。
StdScheduler
:直接本地调用RemoteScheduler
:通过RMI远程代理RemoteMBeanScheduler
:通过JMX远程代理
我们在上一节中创建的就是默认的StdScheduler
,所以我们接着看StdScheduler
的scheduleJob方法。
StdScheduler
类似一个包装类,包装了QuartzScheduler
,其中提供了有关QuartzScheduler
的方法
直接开始scheduleJob方法:
public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
validateState();
if (jobDetail == null) {
throw new SchedulerException("JobDetail cannot be null");
}
if (trigger == null) {
throw new SchedulerException("Trigger cannot be null");
}
if (jobDetail.getKey() == null) {
throw new SchedulerException("Job's key cannot be null");
}
if (jobDetail.getJobClass() == null) {
throw new SchedulerException("Job's class cannot be null");
}
OperableTrigger trig = (OperableTrigger)trigger;
if (trigger.getJobKey() == null) {
trig.setJobKey(jobDetail.getKey());
} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
throw new SchedulerException(
"Trigger does not reference given job!");
}
trig.validate();
Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
}
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);
return ft;
}
先校验一些参数不为空,然后关联一个日历,日历模式的后面出博客单独说,这里之用最简单、最常用的,直接看注册任务和触发器,如下:
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
获取JobStore后调用注册的方法,这里我们分两种:内存型RAMJobStore
和持久型JobStoreSupport
1、先看RAMJobStore
中的注册任务:
public void storeJob(JobDetail newJob,
boolean replaceExisting) throws ObjectAlreadyExistsException {
JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
boolean repl = false;
synchronized (lock) {
if (jobsByKey.get(jw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
repl = true;
}
if (!repl) {
// get job group
HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
if (grpMap == null) {
grpMap = new HashMap<JobKey, JobWrapper>(100);
jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
}
// add to jobs by group
grpMap.put(newJob.getKey(), jw);
// add to jobs by FQN map
jobsByKey.put(jw.key, jw);
} else {
// update job detail
JobWrapper orig = jobsByKey.get(jw.key);
orig.jobDetail = jw.jobDetail; // already cloned
}
}
}
先把JobDetail
包装成JobWrapper
,并使用HashMap<JobKey, JobWrapper> jobsByKey结构存储任务key和任务。
这里需要加锁,防止并发重复创建错误。首先判断缓存中是否存在key值的数据,如果存在,不允许重复创建相同JobName和JobGroup的任务,报错。
最后通过key和group缓存数据:jobsByKey
、jobsByGroup
。
2、再看RAMJobStore
中的注册触发器:
public void storeTrigger(OperableTrigger newTrigger,
boolean replaceExisting) throws JobPersistenceException {
TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone());
synchronized (lock) {
if (triggersByKey.get(tw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newTrigger);
}
removeTrigger(newTrigger.getKey(), false);
}
if (retrieveJob(newTrigger.getJobKey()) == null) {
throw new JobPersistenceException("The job ("
+ newTrigger.getJobKey()
+ ") referenced by the trigger does not exist.");
}
// add to triggers by job
List<TriggerWrapper> jobList = triggersByJob.get(tw.jobKey);
if(jobList == null) {
jobList = new ArrayList<TriggerWrapper>(1);
triggersByJob.put(tw.jobKey, jobList);
}
jobList.add(tw);
// add to triggers by group
HashMap<TriggerKey, TriggerWrapper> grpMap = triggersByGroup.get(newTrigger.getKey().getGroup());
if (grpMap == null) {
grpMap = new HashMap<TriggerKey, TriggerWrapper>(100);
triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);
}
grpMap.put(newTrigger.getKey(), tw);
// add to triggers by FQN map
triggersByKey.put(tw.key, tw);
if (pausedTriggerGroups.contains(newTrigger.getKey().getGroup())
|| pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) {
tw.state = TriggerWrapper.STATE_PAUSED;
if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
}
} else if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_BLOCKED;
} else {
timeTriggers.add(tw);
}
}
}
类似注册任务,这里包装一个TriggerWrapper
。
和注册任务不同的是这里有3个缓存:triggersByJob
、triggersByGroup
、triggersByKey
注册好触发器后,判断触发器是否是暂停的,并将状态改为暂停;如果是已阻止的,状态改为已阻止。所有正常的触发器会存储在timeTriggers
中。
3、JobStoreSupport
中注册的任务:
protected void storeJob(Connection conn,
JobDetail newJob, boolean replaceExisting)
throws JobPersistenceException {
boolean existingJob = jobExists(conn, newJob.getKey());
try {
if (existingJob) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
getDelegate().updateJobDetail(conn, newJob);
} else {
getDelegate().insertJobDetail(conn, newJob);
}
} catch (IOException e) {
throw new JobPersistenceException("Couldn't store job: "
+ e.getMessage(), e);
} catch (SQLException e) {
throw new JobPersistenceException("Couldn't store job: "
+ e.getMessage(), e);
}
}
首先看一下jobExists方法,这个查询数据库,看是否已经存在数据。
SELECT JOB_NAME FROM {0}JOB_DETAILS WHERE SCHED_NAME={1} AND JOB_NAME = ? AND JOB_GROUP = ?
如果任务存在,则报错。如果不存在就插入一条数据库。
这里获取DB操作的驱动,这里以MySQL为例,实现类为StdJDBCDelegate
,具体插入sql如下:
INSERT INTO {0}JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP,DESCRIPTION,JOB_CLASS_NAME,IS_DURABLE,
IS_NONCONCURRENT,IS_UPDATE_DATA,REQUESTS_RECOVERY,JOB_DATA) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?)
4、JobStoreSupport
中注册触发器:
protected void storeTrigger(Connection conn,
OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
boolean forceState, boolean recovering)
throws JobPersistenceException {
boolean existingTrigger = triggerExists(conn, newTrigger.getKey());
if ((existingTrigger) && (!replaceExisting)) {
throw new ObjectAlreadyExistsException(newTrigger);
}
try {
boolean shouldBepaused;
if (!forceState) {
//当前的group停止
shouldBepaused = getDelegate().isTriggerGroupPaused(
conn, newTrigger.getKey().getGroup());
if(!shouldBepaused) {
//所有的group停止
shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
ALL_GROUPS_PAUSED);
//如果当前没有停止,但是所有的都停止,也要新增一条数据
if (shouldBepaused) {
getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
}
}
if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) {
state = STATE_PAUSED;
}
}
if(job == null) {
//SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME={1} AND JOB_NAME = ? AND JOB_GROUP = ?
job = retrieveJob(conn, newTrigger.getJobKey());
}
if (job == null) {
throw new JobPersistenceException("The job ("
+ newTrigger.getJobKey()
+ ") referenced by the trigger does not exist.");
}
if (job.isConcurrentExectionDisallowed() && !recovering) {
state = checkBlockedState(conn, job.getKey(), state);
}
if (existingTrigger) {
getDelegate().updateTrigger(conn, newTrigger, state, job);
} else {
getDelegate().insertTrigger(conn, newTrigger, state, job);
}
} catch (Exception e) {
throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '"
+ newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
}
}
判断是否存在触发器,具体sql如下:
SELECT TRIGGER_NAME FROM {0}TRIGGERS WHERE SCHED_NAME={1} AND TRIGGER_NAME= ? AND TRIGGER_GROUP = ?
查看是否存在已经停止的触发器组(当前组和所有组):
SELECT TRIGGER_GROUP FROM {0}PAUSED_TRIGGER_GRPS WHERE SCHED_NAME={1} AND TRIGGER_GROUP=?
最后,存在就修改数据库的触发器,不存在就新增
UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
&&
INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
最后
以上就是奋斗香水为你收集整理的定时任务系列(5)-Quartz绑定任务和触发器核心原理的全部内容,希望文章能够帮你解决定时任务系列(5)-Quartz绑定任务和触发器核心原理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复