我是靠谱客的博主 奋斗香水,最近开发中收集的这篇文章主要介绍定时任务系列(5)-Quartz绑定任务和触发器核心原理,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

上一篇创建完Scheduler后,下一步就是绑定关联一个JobDetailTrigger
接着看一下这个方法:
scheduler.scheduleJob(jobDetail, trigger);
首先,Scheduler有3个实现类:StdSchedulerRemoteSchedulerRemoteMBeanScheduler

  • StdScheduler:直接本地调用
  • RemoteScheduler:通过RMI远程代理
  • RemoteMBeanScheduler:通过JMX远程代理

我们在上一节中创建的就是默认的StdScheduler,所以我们接着看StdScheduler的scheduleJob方法。
StdScheduler类似一个包装类,包装了QuartzScheduler,其中提供了有关QuartzScheduler的方法
直接开始scheduleJob方法:

    public Date scheduleJob(JobDetail jobDetail,
            Trigger trigger) throws SchedulerException {
        validateState();

        if (jobDetail == null) {
            throw new SchedulerException("JobDetail cannot be null");
        }
        
        if (trigger == null) {
            throw new SchedulerException("Trigger cannot be null");
        }
        
        if (jobDetail.getKey() == null) {
            throw new SchedulerException("Job's key cannot be null");
        }

        if (jobDetail.getJobClass() == null) {
            throw new SchedulerException("Job's class cannot be null");
        }
        
        OperableTrigger trig = (OperableTrigger)trigger;

        if (trigger.getJobKey() == null) {
            trig.setJobKey(jobDetail.getKey());
        } else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
            throw new SchedulerException(
                "Trigger does not reference given job!");
        }

        trig.validate();

        Calendar cal = null;
        if (trigger.getCalendarName() != null) {
            cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
        }
        Date ft = trig.computeFirstFireTime(cal);

        if (ft == null) {
            throw new SchedulerException(
                    "Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
        }

        resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
        notifySchedulerListenersJobAdded(jobDetail);
        notifySchedulerThread(trigger.getNextFireTime().getTime());
        notifySchedulerListenersSchduled(trigger);

        return ft;
    }

先校验一些参数不为空,然后关联一个日历,日历模式的后面出博客单独说,这里之用最简单、最常用的,直接看注册任务和触发器,如下:

resources.getJobStore().storeJobAndTrigger(jobDetail, trig);

获取JobStore后调用注册的方法,这里我们分两种:内存型RAMJobStore和持久型JobStoreSupport
1、先看RAMJobStore中的注册任务:

    public void storeJob(JobDetail newJob,
            boolean replaceExisting) throws ObjectAlreadyExistsException {
        JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());

        boolean repl = false;

        synchronized (lock) {
            if (jobsByKey.get(jw.key) != null) {
                if (!replaceExisting) {
                    throw new ObjectAlreadyExistsException(newJob);
                }
                repl = true;
            }

            if (!repl) {
                // get job group
                HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
                if (grpMap == null) {
                    grpMap = new HashMap<JobKey, JobWrapper>(100);
                    jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
                }
                // add to jobs by group
                grpMap.put(newJob.getKey(), jw);
                // add to jobs by FQN map
                jobsByKey.put(jw.key, jw);
            } else {
                // update job detail
                JobWrapper orig = jobsByKey.get(jw.key);
                orig.jobDetail = jw.jobDetail; // already cloned
            }
        }
    }

先把JobDetail包装成JobWrapper,并使用HashMap<JobKey, JobWrapper> jobsByKey结构存储任务key和任务。
这里需要加锁,防止并发重复创建错误。首先判断缓存中是否存在key值的数据,如果存在,不允许重复创建相同JobName和JobGroup的任务,报错。
最后通过key和group缓存数据:jobsByKeyjobsByGroup

2、再看RAMJobStore中的注册触发器:

    public void storeTrigger(OperableTrigger newTrigger,
            boolean replaceExisting) throws JobPersistenceException {
        TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone());

        synchronized (lock) {
            if (triggersByKey.get(tw.key) != null) {
                if (!replaceExisting) {
                    throw new ObjectAlreadyExistsException(newTrigger);
                }
    
                removeTrigger(newTrigger.getKey(), false);
            }
    
            if (retrieveJob(newTrigger.getJobKey()) == null) {
                throw new JobPersistenceException("The job ("
                        + newTrigger.getJobKey()
                        + ") referenced by the trigger does not exist.");
            }

            // add to triggers by job
            List<TriggerWrapper> jobList = triggersByJob.get(tw.jobKey);
            if(jobList == null) {
                jobList = new ArrayList<TriggerWrapper>(1);
                triggersByJob.put(tw.jobKey, jobList);
            }
            jobList.add(tw);
            
            // add to triggers by group
            HashMap<TriggerKey, TriggerWrapper> grpMap = triggersByGroup.get(newTrigger.getKey().getGroup());
            if (grpMap == null) {
                grpMap = new HashMap<TriggerKey, TriggerWrapper>(100);
                triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);
            }
            grpMap.put(newTrigger.getKey(), tw);
            // add to triggers by FQN map
            triggersByKey.put(tw.key, tw);

            if (pausedTriggerGroups.contains(newTrigger.getKey().getGroup())
                    || pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) {
                tw.state = TriggerWrapper.STATE_PAUSED;
                if (blockedJobs.contains(tw.jobKey)) {
                    tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
                }
            } else if (blockedJobs.contains(tw.jobKey)) {
                tw.state = TriggerWrapper.STATE_BLOCKED;
            } else {
                timeTriggers.add(tw);
            }
        }
    }

类似注册任务,这里包装一个TriggerWrapper
和注册任务不同的是这里有3个缓存:triggersByJobtriggersByGrouptriggersByKey
注册好触发器后,判断触发器是否是暂停的,并将状态改为暂停;如果是已阻止的,状态改为已阻止。所有正常的触发器会存储在timeTriggers中。

3、JobStoreSupport中注册的任务:

    protected void storeJob(Connection conn, 
            JobDetail newJob, boolean replaceExisting)
        throws JobPersistenceException {

        boolean existingJob = jobExists(conn, newJob.getKey());
        try {
            if (existingJob) {
                if (!replaceExisting) { 
                    throw new ObjectAlreadyExistsException(newJob); 
                }
                getDelegate().updateJobDetail(conn, newJob);
            } else {
                getDelegate().insertJobDetail(conn, newJob);
            }
        } catch (IOException e) {
            throw new JobPersistenceException("Couldn't store job: "
                    + e.getMessage(), e);
        } catch (SQLException e) {
            throw new JobPersistenceException("Couldn't store job: "
                    + e.getMessage(), e);
        }
    }

首先看一下jobExists方法,这个查询数据库,看是否已经存在数据。

SELECT JOB_NAME FROM {0}JOB_DETAILS WHERE SCHED_NAME={1} AND JOB_NAME = ? AND JOB_GROUP = ?

如果任务存在,则报错。如果不存在就插入一条数据库。
这里获取DB操作的驱动,这里以MySQL为例,实现类为StdJDBCDelegate,具体插入sql如下:

INSERT INTO {0}JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP,DESCRIPTION,JOB_CLASS_NAME,IS_DURABLE,
IS_NONCONCURRENT,IS_UPDATE_DATA,REQUESTS_RECOVERY,JOB_DATA) VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?)

4、JobStoreSupport中注册触发器:

    protected void storeTrigger(Connection conn,
            OperableTrigger newTrigger, JobDetail job, boolean replaceExisting, String state,
            boolean forceState, boolean recovering)
        throws JobPersistenceException {

        boolean existingTrigger = triggerExists(conn, newTrigger.getKey());

        if ((existingTrigger) && (!replaceExisting)) { 
            throw new ObjectAlreadyExistsException(newTrigger); 
        }
        
        try {

            boolean shouldBepaused;

            if (!forceState) {
                //当前的group停止
                shouldBepaused = getDelegate().isTriggerGroupPaused(
                        conn, newTrigger.getKey().getGroup());

                if(!shouldBepaused) {
                    //所有的group停止
                    shouldBepaused = getDelegate().isTriggerGroupPaused(conn,
                            ALL_GROUPS_PAUSED);
                    //如果当前没有停止,但是所有的都停止,也要新增一条数据
                    if (shouldBepaused) {
                        getDelegate().insertPausedTriggerGroup(conn, newTrigger.getKey().getGroup());
                    }
                }

                if (shouldBepaused && (state.equals(STATE_WAITING) || state.equals(STATE_ACQUIRED))) {
                    state = STATE_PAUSED;
                }
            }

            if(job == null) {
                //SELECT * FROM {0}JOB_DETAILS WHERE SCHED_NAME={1} AND JOB_NAME = ? AND JOB_GROUP = ?
                job = retrieveJob(conn, newTrigger.getJobKey());
            }
            if (job == null) {
                throw new JobPersistenceException("The job ("
                        + newTrigger.getJobKey()
                        + ") referenced by the trigger does not exist.");
            }

            if (job.isConcurrentExectionDisallowed() && !recovering) { 
                state = checkBlockedState(conn, job.getKey(), state);
            }
            
            if (existingTrigger) {
                getDelegate().updateTrigger(conn, newTrigger, state, job);
            } else {
                getDelegate().insertTrigger(conn, newTrigger, state, job);
            }
        } catch (Exception e) {
            throw new JobPersistenceException("Couldn't store trigger '" + newTrigger.getKey() + "' for '" 
                    + newTrigger.getJobKey() + "' job:" + e.getMessage(), e);
        }
    }

判断是否存在触发器,具体sql如下:

SELECT TRIGGER_NAME FROM {0}TRIGGERS WHERE SCHED_NAME={1} AND TRIGGER_NAME= ? AND TRIGGER_GROUP = ?

查看是否存在已经停止的触发器组(当前组和所有组):

SELECT TRIGGER_GROUP FROM {0}PAUSED_TRIGGER_GRPS WHERE SCHED_NAME={1} AND TRIGGER_GROUP=?

最后,存在就修改数据库的触发器,不存在就新增

UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
UPDATE {0}TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ? WHERE SCHED_NAME = {1} AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
&&
INSERT INTO {0}TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY)  VALUES({1}, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

最后

以上就是奋斗香水为你收集整理的定时任务系列(5)-Quartz绑定任务和触发器核心原理的全部内容,希望文章能够帮你解决定时任务系列(5)-Quartz绑定任务和触发器核心原理所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(50)

评论列表共有 0 条评论

立即
投稿
返回
顶部