我是靠谱客的博主 谦让导师,最近开发中收集的这篇文章主要介绍Quartz与Spring结合动态控制任务RAM版,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

使用quart 动态的创建控制任务,任务数据是放置到数据库中的。自己建立了一个任务表,和一个历史表
下面代码只开放部分代码。(工具类中差不多是全的)工具类中操作参数和数据库的。可以忽略不计。由于某些原因。暂时不能全部放代码,自己看懂了就添加就行。
ScheduleJob 是保存任务信息的bean, ScheduleJobMapper 是mybatis的dao接口。
比较简单。把 mapper去掉。这个主要的工具类就可以直接运行测试

在 quartz.properties 中可以配置使用数据库版还是内存RAM版(该文件在quartz中有一个默认的配置文件)

实现思路

  1. 根据quartz Api 实现主要工具类中的一些常用操作。
  2. 单机main方法测试正常之后,与spring结合
  3. 首先要知道怎么启动一个入口任务,去每隔一定的时间去获取数据库中配置的任务,查看是否有新增任务,或则任务中的状态是否有手动控制更改的,然后进行相应的控制任务。 为了方便获取spring容器中的服务对象,没有使用一个job来定时跑任务,而是implements ApplicationListener<ContextRefreshedEvent>接口,让项目启动完成之后,触发我们自定义的一个线程。

需要注意实现中的坑:

  1. 在单独main方法中测试启动、创建、控制任务都ok,但是开启循环抓取任务信息根据状态进行对应的操作的时候,发现暂停等一些控制操作无效了。
    问题出现的原因:

    1. 调度工厂start 会把负面状态(比如暂停)都给变成启动状态。
    2. 更新tigger时间,会让暂停的任务重新运行

    解决方案:在循环遍历之前调用调度工厂的start方法,循环中有条件的判断 是否需要更新时间。(比如本实现中就专门增加了一个状态叫做更新世间)

  2. 在spring集合中使用quarts自带的实现工厂(比如:Scheduler scheduler = new StdSchedulerFactory().getScheduler();),会出现tomcat关掉之后,还有一个独立的进程跑任务
    解决方案:使用spring的调度工厂实现org.springframework.scheduling.quartz.SchedulerFactoryBean,该jar包在spring-context-support中

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
        </dependency>

pom.xml

            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.2.1</version>
            </dependency>
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz-jobs</artifactId>
                <version>2.2.1</version>
            </dependency>

主要的工具类

import org.apache.log4j.Logger;
import org.zq.beans.parameter.system.ScheduleJob;
import org.zq.core.parameter.system.dao.mapper.ScheduleJobMapper;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.ParseException;

/**
 * Created by zq on 2015/5/26.
 */
@Component
public class ScheduleUtils {
    private static Logger log = Logger.getLogger(ScheduleUtils.class);
    /** 自动状态:任务创建失败 **/
    public static final String RUN_STATUS_0 = "0";
    /** 自动状态:准备运行 **/
    public static final String RUN_STATUS_1 = "1";
    /** 自动状态:运行中 **/
    public static final String RUN_STATUS_2 = "2";
    /** 自动状态:手动暂停 **/
    public static final String RUN_STATUS_3 = "3";
    /** 自动状态:更新任务失败 **/
    public static final String RUN_STATUS_4 = "4";
    /** 自动状态:运行完成 **/
    public static final String RUN_STATUS_5 = "5";

    /** 自动状态: -1:由运行状态产生的错误导致任务未运行等(此状态由系统维护)**/
    public static final String STATUS_1_1 = "-1";
    /** 手动控制状态:删除 **/
    public static final String STATUS_0 = "0";
    /** 手动控制状态:自动执行**/
    public static final String STATUS_1 = "1";
    /** 手动控制状态:暂停 **/
    public static final String STATUS_2 = "2";
    /** 手动控制状态:恢复 **/
    public static final String STATUS_3 = "3";
    /** 手动控制状态:更新任务:只能更新时间 **/
    public static final String STATUS_4 = "4";
    /** 手动控制状态:立即执行,并且只执行一次 **/
    public static final String STATUS_5 = "5";



    /** 默认job group 名称**/
    public static final String DEFAULT_JOB_GROUP = "DEFAULT_JOB_GROUP_zq";
    /** 默认 trigger 名称 **/
    public static final String DEFAULT_TRIGGER = "DEFAULT_TRIGGER_zq";

    /**
     * 任务信息,默认的参数名称
     */
    public static final String JOB_PARAM_KEY = "JOB_PARAM_KEY";

    @Autowired
    private static ScheduleJobMapper scheduleJobMapper;
    /**
     * 获取表达式触发器
     *
     * @param scheduler scheduler调度器
     * @param jobName the job name名称
     * @param jobGroup the job group组名称
     * @return cron trigger
     */
    public static CronTrigger getCronTrigger(Scheduler scheduler, String jobName, String jobGroup) {
        try {
            TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
            return (CronTrigger) scheduler.getTrigger(triggerKey);
        } catch (SchedulerException e) {
            log.error("获取定时任务CronTrigger出现异常", e);
        }
        return null;
    }

    /**
     * 创建任务
     *
     * @param scheduler the scheduler
     * @param scheduleJob the schedule job
     */
    public static boolean createScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
        return createScheduleJob(scheduler, scheduleJob.getJobName(), scheduleJob.getJobGroup(),
                scheduleJob.getCronExpression(), false, scheduleJob);
    }

    /**
     * 创建定时任务
     *
     * @param scheduler the scheduler
     * @param jobName the job name
     * @param jobGroup the job group
     * @param cronExpression the cron expression
     * @param isSync the is sync
     * @param param the param
     */
    public static boolean createScheduleJob(Scheduler scheduler, String jobName, String jobGroup,
                                         String cronExpression, boolean isSync, ScheduleJob param) {
        Class<? extends Job> jobClass = null;
        Class clazz = parth(param);
        if(clazz == null){
            return false;
        }else{
            jobClass = clazz;
        }

        //构建job信息
        JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroup).build();

        //放入参数,运行时的方法可以获取
        jobDetail.getJobDataMap().put(JOB_PARAM_KEY, param);

        //表达式调度构建器
        CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);

        //按新的cronExpression表达式构建一个新的trigger
        CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, jobGroup)
                .withSchedule(scheduleBuilder).build();

        try {
            scheduler.scheduleJob(jobDetail, trigger);
        } catch (SchedulerException e) {
            log.error("创建定时任务失败", e);
            param.setStatus(STATUS_1_1);
            param.setRunStatus(RUN_STATUS_0);
            return false;
        }
        param.setRunStatus(RUN_STATUS_1);
        return true;
    }


    /**
     * 更新定时任务
     *
     * @param scheduler the scheduler
     * @param scheduleJob the schedule job
     */
    public static boolean updateScheduleJob(Scheduler scheduler, ScheduleJob scheduleJob) {
        return updateScheduleJob(scheduler, scheduleJob.getJobName(), scheduleJob.getJobGroup(),
                scheduleJob.getCronExpression(), false, scheduleJob);
    }

    /**
     * 更新定时任务
     *
     * @param scheduler the scheduler
     * @param jobName the job name
     * @param jobGroup the job group
     * @param cronExpression the cron expression
     * @param isSync the is sync
     * @param param the param
     */
    public static boolean updateScheduleJob(Scheduler scheduler, String jobName, String jobGroup,
                                         String cronExpression, boolean isSync, ScheduleJob param) {
        Class<? extends Job> jobClass = null;
        Class clazz = parth(param);
        if(clazz == null){
            return false;
        }else{
            jobClass = clazz;
        }
        try {
            JobDetail jobDetail = scheduler.getJobDetail(getJobKey(jobName, jobGroup));
            jobDetail = jobDetail.getJobBuilder().ofType(jobClass).build();
//            //更新参数 实际测试中发现无法更新
//            JobDataMap jobDataMap = jobDetail.getJobDataMap();
//            jobDataMap.put(JOB_PARAM_KEY, param);
//            jobDetail.getJobBuilder().usingJobData(jobDataMap);
            TriggerKey triggerKey = ScheduleUtils.getTriggerKey(jobName, jobGroup);
            //表达式调度构建器
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            //按新的cronExpression表达式重新构建trigger
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
            //按新的trigger重新设置job执行
            scheduler.rescheduleJob(triggerKey, trigger);

        } catch (SchedulerException e) {
            param.setStatus(STATUS_3); //更新失败,处理任务暂停
            param.setRunStatus(RUN_STATUS_5);
            log.error("更新定时任务失败", e);
            return false;
        }
        return true;
    }

    /**
     * 获取class
     * @param param
     * @return
     */
    public static Class parth(ScheduleJob param){
        Class clazz = null;
        try {
             clazz  =  Class.forName(param.getJobClassName());
        } catch (Exception e) {
            log.error("JobClassName : " + param.getJobClassName() + ",未找到该类·");
            param.setStatus(STATUS_1);
            param.setRunStatus(RUN_STATUS_0);
        }
        return clazz;
    }

    /**
     * 获取jobKey
     *
     * @param jobName the job name
     * @param jobGroup the job group
     * @return the job key
     */
    public static JobKey getJobKey(String jobName, String jobGroup) {
        return JobKey.jobKey(jobName, jobGroup);
    }
    /**
     * 获取触发器key,创建的时候 就是以 jobName 和 job group 命名的
     *
     * @param jobName
     * @param jobGroup
     * @return
     */
    public static TriggerKey getTriggerKey(String jobName, String jobGroup) {
        return TriggerKey.triggerKey(jobName, jobGroup);
    }

    /**
     * 暂停任务
     *
     * @param scheduler
     * @param jobName
     * @param jobGroup
     */
    public static void pauseJob(Scheduler scheduler, String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.pauseJob(jobKey);
            scheduler.pauseTrigger(getTriggerKey(jobName, jobGroup));//暂停触发器
        } catch (SchedulerException e) {
            log.error("暂停定时任务失败", e);
        }
    }

    /**
     * 恢复一个定时任务
     * @param scheduler
     * @param jobName
     * @param jobGroup
     */
    public static void resumeJob(Scheduler scheduler, String jobName, String jobGroup) {
        JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
        try {
            scheduler.resumeJob(jobKey);
            scheduler.resumeTrigger(getTriggerKey(jobName, jobGroup));
        } catch (SchedulerException e) {
            log.error("恢复定时任务失败", e);
        }
    }

    /**
     * 删除定时任务
     *
     * @param scheduler
     * @param jobName
     * @param jobGroup
     */
    public static boolean deleteScheduleJob(Scheduler scheduler, String jobName, String jobGroup) {
        try {
            scheduler.deleteJob(getJobKey(jobName, jobGroup));
            scheduler.pauseTrigger(getTriggerKey(jobName, jobGroup));//停止触发器
            scheduler.unscheduleJob(getTriggerKey(jobName, jobGroup));//移除触发器
        } catch (SchedulerException e) {
            log.error("删除定时任务失败", e);
            return false;
        }
        return true;
    }

    public static void main(String[] args) throws SchedulerException, ParseException, InterruptedException {
        Scheduler scheduler = new StdSchedulerFactory().getScheduler();
        ScheduleJob scheduleJob = new ScheduleJob();
        scheduleJob.setScheduleJobId(1);
        scheduleJob.setStatus(STATUS_2);
        scheduleJob.setJobName("test");
        scheduleJob.setJobGroup("testGroup");
        scheduleJob.setJobClassName("org.zq.core.common.util.schedule.job.TestJob");
        scheduleJob.setCronExpression("0/2 * * * * ?");
        ScheduleUtils.createScheduleJob(scheduler,scheduleJob);
        //启动
        if (!scheduler.isShutdown()) {
            scheduler.start();
        }
        Thread.sleep(10000);
        ScheduleUtils.deleteScheduleJob(scheduler,scheduleJob.getJobName(),scheduleJob.getJobGroup());
    }
}

与spring结合,初始化任务的一个类

import org.apache.log4j.Logger;
import org.zq.beans.parameter.system.ScheduleJob;
import org.zq.beans.parameter.system.ScheduleJobHistory;
import org.zq.core.common.util.BeanUtils;
import org.zq.core.parameter.system.dao.mapper.ScheduleJobMapper;
import org.zq.core.parameter.system.dao.mapper.ScheduleJobHistoryMapper;
import org.quartz.CronTrigger;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.CollectionUtils;

import java.util.Date;
import java.util.List;

/**
 * Created by zq on 2015/5/26.
 * 在xml中初始化该类,方便控制是否需要启动定时任务处理
 */
public class InItJob implements ApplicationListener<ContextRefreshedEvent> {
    private static Logger log = Logger.getLogger(InItJob.class);
    @Autowired
    private ScheduleJobMapper scheduleJobMapper; 
    @Autowired
    private Scheduler scheduler;  // 该对象是spring工厂配置自动产生的一个调度对象
    @Autowired
    private ScheduleJobHistoryMapper scheduleJobHistoryMapper;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext().getParent() == null) { //spring mvc  中有两个容器
            init();
        }
    }

    public void init() {
        log.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 启动初始化任务" + scheduler.toString());
        try {
            if (!scheduler.isShutdown()) {
                scheduler.start();
            }
        } catch (Exception e) {

        }

        new Thread(new Init()).start();
    }

    class Init implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    running();
                    log.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 任务运行中");
                    Thread.sleep(20000);  //每20秒 重新抓去一次数据库任务信息
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        public void running() throws SchedulerException {
            List<ScheduleJob> list = scheduleJobMapper.selectAll(); //从数据库获取所有的任务信息
            if (CollectionUtils.isEmpty(list)) {
                return;
            }
            for (ScheduleJob job : list) {
                if (handleStatus(scheduler, job)) continue;
                CronTrigger cronTrigger = ScheduleUtils.getCronTrigger(scheduler, job.getJobName(), job.getJobGroup());
                //不存在,创建一个
                if (cronTrigger == null) {
                    ScheduleUtils.createScheduleJob(scheduler, job);
                }
                job.setModifyById("system");
                job.setModifyDate(new Date());
                scheduleJobMapper.updateByPrimaryKey(job);
            }

        }

        /**
         * 处理状态,由于某些原因,以下代码不能全部公开。但是很简单逻辑就是,根据读取到的任务状态 调用工具类中的各种操作,达到能动态控制任务的一个功能
         *
         * @param scheduler
         * @param job
         * @return 返回true。不继续执行循环下面的代码
         */
        private boolean handleStatus(Scheduler scheduler, ScheduleJob job) {
            String status = job.getStatus();
            String runStatus = job.getRunStatus();

            CronTrigger cronTrigger = ScheduleUtils.getCronTrigger(scheduler, job.getJobName(), job.getJobGroup());

            //判断运行状态
            //如果是已经运行完成:则移动到历史表
            if (ScheduleUtils.RUN_STATUS_5.equals(runStatus)) {
                scheduleJobMapper.deleteByPrimaryKey(job.getScheduleJobId());
                ScheduleJobHistory sjh = new ScheduleJobHistory();
                BeanUtils.copyProperties(sjh, job);
                scheduleJobHistoryMapper.insertSelective(sjh);
            }
            //如果是暂停
            if (ScheduleUtils.STATUS_2.equals(status)) { //如果是暂停状态
                if (cronTrigger == null) { //状态异常
                    log.error("暂停状态异常:任务调度中没有找到对应的job!");
                    return true;
                }
                ScheduleUtils.pauseJob(scheduler, job.getJobName(), job.getJobGroup());
                job.setRunStatus(ScheduleUtils.RUN_STATUS_3);
            }
            return false;
        }
    }
}

另外一种启动方式

public class ScheduleJobInit {
    @Autowired
    InItJob inItJob;
    /**
     * 项目启动时初始化
     */
    @PostConstruct  //该注解是在项目启动完成之后触发该任务。同样也需要在xml中配置该类的bean
    public void init() {
        inItJob.init();
    }
}

spring.xml中配置bean

    <!-- 调度任务入口类 -->
    <bean id="inItJob" class="org.zq.core.common.util.schedule.util.InItJob"/>
    <bean id="schedulerFactoryBean" class="org.springframework.scheduling.quartz.SchedulerFactoryBean" />

最后

以上就是谦让导师为你收集整理的Quartz与Spring结合动态控制任务RAM版的全部内容,希望文章能够帮你解决Quartz与Spring结合动态控制任务RAM版所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(101)

评论列表共有 0 条评论

立即
投稿
返回
顶部