我是靠谱客的博主 傻傻萝莉,最近开发中收集的这篇文章主要介绍基于springboot和quartz的任务调度系统,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、简述

 项目地址:https://github.com/littlechare/job-manager

 项目详细的信息请参看项目的README

下载地址:https://download.csdn.net/download/w172087242/10333176

  之前想写一个关于定时任务的组建,便于项目中快速集成;

  写完之后,又继续在此基础上演变成系统,当然该系统还缺少很多东西;

  如业务方的bid生成与验证、任务回调响应报文的规则、任务调度的监控、如何做HA等等;

  因为是个人项目,所以用了部分下班时间和本次清明节的空闲时间实现,

  所以项目可定也有不足的地方,但是如果开发者想使用,也可以下载下来快速

 集成,也可在此基础上加以修改。


二、项目功能简述

2.1 任务类型支持说明

任务类型 说明 表达式示例 备注
CRONCron类型的任务,不支持表达式更改0/20 * * * * *? 每20秒调用一次
TRIGGER Trigger类型的任务,支持表达式更改0 0 4 * * *? 每天凌晨4点调用
FIXED Fixed类型任务,不支持表达式更改fixed=5000 单位毫秒,表示每5秒调用一次
FIXED_DELAYFixed_delay任务,不支持表达式更改fixed=5000,delay=10000 单位毫秒,表示延迟10秒后执行

2.2 业务方说明

 1.业务方需要自己维护业务方编号(bid)和任务编号;

 2.支持任务的随时取消、随时更改、随时添加等,回调地址一http(s)打头

2.3 模块说明与类说明

模块说明
模块名称 功能 说明
job-dispatcher-base 任务基础模块 定时任务能力提供者(可抽离成组件)
job-dispatcher-biz任务业务模块 业务管理与任务触发模块
job-dispatcher-manager任务管理模块 管理任务变动并发出变动事件提醒(分布部署可基于消息进行改进)
job-dispatcher-test测试模块 独立模块,含有一个接口,便于测试任务回调

base模块类说明
类名(简要名称,前缀省略com.littlehow.job) 功能 说明
base.config.ScheduleTaskConfig定时任务配置器 任务的核心管理器
base.config.TaskConfig定时任务缓存 缓存任务信息
base.config.TaskType定时任务类型 枚举CRON,TRIGGER,FIXED,FIXED_DELAY
base.BaseTask任务基础信息 任务实体bean,实现runnable和initializingbean
cron基础类 因为csdn编辑器问题,类名写于此BaseCronTask
base.BaseFixedTaskfixed基础类 实现baseTask类,与cron类似
ScheduleBaseServiceservice类 对外暴露服务类
manager模块类说明
类名(要名简称,前缀省略com.littlehow.job) 功能 说明
manager.api.TaskManagerService任务持久化管理apimanager模块可以独立出去,所以该api为唯一访问接口
manager.event.TaskEvent 任务事件 任务发送的事件实体,继承ApplicationEvent
manager.event.TaskEventType 任务事件类型 枚举ADD,UPDATE,REMOVE
manager.support.MysqlTaskManagerSupport api实现 mysql持久化任务的实现
biz模块类说明
类名(要名简称,前缀省略com.littlehow.job) 功能 说明
advice.ExceptionAdvice 异常统一处理 
advice.SuccessResponseAdvice 成功统一处理 
config.InterceptorConfig 拦截器配置 
interceptor.BusinessInterceptor 拦截器实现主要运用验证和获取header中的bid
interceptor.BusinessContext 上下文bid上下文管理
listener.TaskEventListener 任务事件监听 监听任务事件,实现ApplicationListener接口
service.CallbackExecuteService 回调处理服务 定时任务的回调地址调用
service.TaskExecuteService 任务处理服务 
controller.TaskManagerController 任务管理web接口 支持任务的增删改查
init.InitTask 初始化任务 任务系统重启后需要重新将已有任务加入任务队列
vo.ResponseEntity 相应参数 
vo.ArgumentsException 参数异常 校验参数时,如果不合法会抛出此异常
JobStart 启动类 springboot项目启动类
test模块类说明
类名(要名简称,前缀省略com.littlehow.job)功能 说明
test.TestController 测试controller 
test.TestStart 测试启动类springboot启动类

三、任务系统代码示例;

3.1 任务配置类(ScheduleTaskConfig)

package com.littlehow.job.base.config;
import com.littlehow.job.base.BaseTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.*;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.Date;
@Lazy(false)
@Configurable
@Component
@EnableScheduling
public class ScheduleTaskConfig implements SchedulingConfigurer {
private Logger log = LoggerFactory.getLogger(ScheduleTaskConfig.class);
private final static String TASK_NOT_EXISTS = "not exists";
private final static String TASK_EXISTS = "exists";
private final static String FAILURE = "failure";
private final static String SUCCESS = "success";
private ScheduledTaskRegistrar scheduledTaskRegistrar;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
this.scheduledTaskRegistrar = scheduledTaskRegistrar;
initTask();
}
/**
* 添加任务
* @param task
* @return
*/
public String addTask(BaseTask task) {
if (scheduledTaskRegistrar == null || task == null) {
return FAILURE;
}
if (TaskConfig.containsTask(task.getId())) {
return TASK_EXISTS;
}
try {
addTask0(task);
TaskConfig.addTask(task);
return SUCCESS;
} catch (Exception e) {
log.error("新增定时任务失败:" + task, e);
throw e;
}
}
/**
* 改变任务执行频率
* @param taskId
* @param expression
* @return
*/
public String changeTask(String taskId, String expression) {
BaseTask baseTask = TaskConfig.getTask(taskId);
if (baseTask == null || TaskType.TRIGGER != baseTask.taskType || expression == null) {
return TASK_NOT_EXISTS;
}
log.info("change trigger expression:(id=" + taskId + ",expression=" + expression+")");
baseTask.setExpression(expression);
return SUCCESS;
}
/**
* 取消定时任务
* @param taskId
* @return
*/
public String cancelTask(String taskId) {
if (!TaskConfig.containsTask(taskId)) {
return TASK_NOT_EXISTS;
}
try {
log.info("cancel task:" + taskId);
TaskConfig.removeTask(taskId).getScheduledTask().cancel();
} catch (Exception e) {
log.error("取消任务失败:" + taskId, e);
throw e;
}
return SUCCESS;
}
/**
* 初始化已配置任务
*/
private void initTask() {
TaskConfig.getTasks().forEach(task -> addTask0(task));
}
private void addTask0(BaseTask task) {
log.info("add task:" + task);
switch (task.taskType) {
case TRIGGER: task.setScheduledTask(addTriggerTask(task));
break;
case CRON: task.setScheduledTask(addCronTask(task, task.getExpression()));
break;
case FIXED_RATE: task.setScheduledTask(addFixedRateTask(task, task.interval()));
break;
case FIXED_DELAY: task.setScheduledTask(addFixedDelayTask(task, task.interval(), task.delay()));
break;
default:
}
}
/**
* 添加不可改变时间表的定时任务
* @param task
*/
private ScheduledTask addCronTask(Runnable task, String expression) {
return scheduledTaskRegistrar.scheduleCronTask(new CronTask(task, expression));
}
/**
* 添加可变时间task
* @param task
* @return
*/
private ScheduledTask addTriggerTask(BaseTask task) {
return scheduledTaskRegistrar.scheduleTriggerTask(new TriggerTask(task, triggerContext -> {
CronTrigger trigger = new CronTrigger(task.getExpression());
Date nextExec = trigger.nextExecutionTime(triggerContext);
return nextExec;
}));
}
/**
* 设置固定频率的定时任务
* @param task
* @param interval
*/
private ScheduledTask addFixedRateTask(Runnable task, long interval) {
return scheduledTaskRegistrar.scheduleFixedRateTask(new IntervalTask(task, interval, 0L));
}
/**
* 设置延迟以固定频率执行的定时任务
* @param task
* @param interval
* @param delay
*/
private ScheduledTask addFixedDelayTask(Runnable task, long interval, long delay) {
return scheduledTaskRegistrar.scheduleFixedDelayTask(new IntervalTask(task, interval, delay));
}
}

3.2 任务基本实体类(BaseTask)

package com.littlehow.job.base;
import com.littlehow.job.base.config.TaskConfig;
import com.littlehow.job.base.config.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.config.ScheduledTask;
/**
* 基本的任务配置类
*/
public abstract class BaseTask implements Runnable,InitializingBean {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
public final TaskType taskType;
private ScheduledTask scheduledTask;
private final String id;
public BaseTask(TaskType taskType, String id) {
this.taskType = taskType;
this.id = id;
}
/**
*
获取任务表达式如:0 0 0/1 * * *? (每个整点执行)
* @return
*/
public abstract String getExpression();
/**
* 固定频率执行的时间间隔
* @return
*/
public abstract long interval();
/**
* 固定频率执行的延迟时间
* @return
*/
public abstract long delay();
/**
* 设置任务表达式
* @param expression
*/
public abstract void setExpression(String expression);
/**
* 获取任务唯一标识
* @return
*/
public String getId() {
return id;
}
public final ScheduledTask getScheduledTask() {
return scheduledTask;
}
public final void setScheduledTask(ScheduledTask scheduledTask) {
this.scheduledTask = scheduledTask;
}
@Override
public void afterPropertiesSet() {
TaskConfig.addTask(this);
}
public String toString() {
return this.getClass().getSimpleName() + "(id:" + id + ",expression:" + getExpression()
+ ",type:" + taskType + ",interval:" + interval()
+ ", delay:" + delay() + ")";
}
}

3.3 任务执行类(TaskExecuteService)

package com.littlehow.job.service;
import com.littlehow.job.base.BaseCronTask;
import com.littlehow.job.base.BaseFixedTask;
import com.littlehow.job.base.ScheduleBaseService;
import com.littlehow.job.manager.pojo.TaskDto;
import com.littlehow.job.manager.pojo.TaskType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
public class TaskExecuteService {
@Autowired
private ScheduleBaseService scheduleBaseService;
@Autowired
private CallbackExecuteService executeService;
private static final Pattern fixed = Pattern.compile("fixed=(\d+)\s*(,\s*delay=(\d+))?");
/**
* 新增任务
* 因表达式解析错误可能会抛出异常
* @param taskDto
*/
public void addTask(TaskDto taskDto) {
if (taskDto == null) {
return;
}
String taskId = getTaskId(taskDto);
CallbackExecuteService.addOrUpdateTask(taskId, taskDto.getCallbackUrl());
TaskType taskType = taskDto.getTaskType();
switch (taskType) {
case FIXED:
case FIXED_DELAY:
long[] params = getFixedParameters(taskDto.getExpression());
BaseFixedTask baseFixedTask = new BaseFixedTask(taskId,
params[0], params[1]) {
@Override
public void run() {
executeService.execute(this.getId());
}
};
scheduleBaseService.addTask(baseFixedTask);
break;
case CRON:
case TRIGGER:
BaseCronTask baseCronTask = new BaseCronTask(taskType == TaskType.CRON ?
com.littlehow.job.base.config.TaskType.CRON :
com.littlehow.job.base.config.TaskType.TRIGGER, taskId) {
@Override
public void run() {
executeService.execute(this.getId());
}
};
baseCronTask.setExpression(taskDto.getExpression());
scheduleBaseService.addTask(baseCronTask);
}
}
/**
* 修改任务
* @param taskDto
*/
public void updateTask(TaskDto taskDto) {
if (!StringUtils.isEmpty(taskDto.getExpression())) {
//修改表达式
scheduleBaseService.changeTask(getTaskId(taskDto), taskDto.getExpression());
}
if (!StringUtils.isEmpty(taskDto.getCallbackUrl())) {
//修改回调地址
CallbackExecuteService.addOrUpdateTask(getTaskId(taskDto), taskDto.getCallbackUrl());
}
}
/**
* 删除任务
* @param taskDto
*/
public void removeTask(TaskDto taskDto) {
String taskId = getTaskId(taskDto);
scheduleBaseService.removeTask(taskId);
CallbackExecuteService.removeTask(taskId);
}
/**
* 获取fixed类型的实际参数信息
* @param expression
* @return
*/
private static long[] getFixedParameters(String expression) {
Matcher matcher = fixed.matcher(expression);
if (matcher.find()) {
long[] fixedParam = {0L, 0L};
fixedParam[0] = Long.parseLong(matcher.group(1));
String delay = matcher.group(3);
if (delay != null) {
fixedParam[1] = Long.parseLong(delay);
}
return fixedParam;
}
throw new IllegalArgumentException("invalid expression:" +expression);
}
/**
* 获取任务编号
* @param task
* @return
*/
private String getTaskId(TaskDto task) {
return task.getBusinessId() + "-" + task.getTaskId();
}
}

3.4 任务回调地址处理类(CallbackExecuteService)

package com.littlehow.job.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* 执行调用定时任务的回调地址
* 可以约定定时任务的回执报文,便于进一步解析
* littlehow 2018/4/7
*/
@Service
public class CallbackExecuteService {
private final static Logger log = LoggerFactory.getLogger(CallbackExecuteService.class);
/**
* 缓存任务与回调之间的关系
*/
private static final Map<String, String> callbackInfo = new HashMap<>();
/**
* http任务处理接口
*/
private RestTemplate restTemplate = new RestTemplate();
/**
* 执行远程任务
* @param taskId
*/
public void execute(String taskId) {
String url = callbackInfo.get(taskId);
if (url == null) {
log.error("任务[" + taskId + "]的回调地址为空");
//FIXME 可以在此处加上监控或通知,也可以已异常的形式抛出,用aop统一处理
return;
}
try {
//FIXME 如果制定了返回值规则,则可以详细解析,否则就简单解析状态码
ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);
if (response.getStatusCodeValue() >= 200 && response.getStatusCodeValue() <300) {
//成功
log.info("定时任务[" + taskId + "]调用完成");
} else {
//失败
log.error("任务[" + taskId + ":" + url + "]执行失败," + response.toString());
}
} catch (Throwable t) {
log.error("任务[" + taskId + ":" + url + "]执行异常", t);
//FIXME 可以在此处加上监控或通知,也可以抛出此部分异常,用aop统一处理
}
}
/**
* 新增或修改任务回调
* @param taskId
* @param callbackUrl
*/
static String addOrUpdateTask(String taskId, String callbackUrl) {
return callbackInfo.put(taskId, callbackUrl);
}
/**
* 删除任务回调
* @param taskId
* @return
*/
static String removeTask(String taskId) {
return callbackInfo.remove(taskId);
}
}


最后

以上就是傻傻萝莉为你收集整理的基于springboot和quartz的任务调度系统的全部内容,希望文章能够帮你解决基于springboot和quartz的任务调度系统所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(27)

评论列表共有 0 条评论

立即
投稿
返回
顶部