概述
一、简述
项目地址:https://github.com/littlechare/job-manager
项目详细的信息请参看项目的README
下载地址:https://download.csdn.net/download/w172087242/10333176
之前想写一个关于定时任务的组建,便于项目中快速集成;
写完之后,又继续在此基础上演变成系统,当然该系统还缺少很多东西;
如业务方的bid生成与验证、任务回调响应报文的规则、任务调度的监控、如何做HA等等;
因为是个人项目,所以用了部分下班时间和本次清明节的空闲时间实现,
所以项目可定也有不足的地方,但是如果开发者想使用,也可以下载下来快速
集成,也可在此基础上加以修改。
二、项目功能简述
2.1 任务类型支持说明
任务类型 | 说明 | 表达式示例 | 备注 |
---|---|---|---|
CRON | Cron类型的任务,不支持表达式更改 | 0/20 * * * * *? | 每20秒调用一次 |
TRIGGER | Trigger类型的任务,支持表达式更改 | 0 0 4 * * *? | 每天凌晨4点调用 |
FIXED | Fixed类型任务,不支持表达式更改 | fixed=5000 | 单位毫秒,表示每5秒调用一次 |
FIXED_DELAY | Fixed_delay任务,不支持表达式更改 | fixed=5000,delay=10000 | 单位毫秒,表示延迟10秒后执行 |
2.2 业务方说明
1.业务方需要自己维护业务方编号(bid)和任务编号;
2.支持任务的随时取消、随时更改、随时添加等,回调地址一http(s)打头
2.3 模块说明与类说明
模块名称 | 功能 | 说明 |
---|---|---|
job-dispatcher-base | 任务基础模块 | 定时任务能力提供者(可抽离成组件) |
job-dispatcher-biz | 任务业务模块 | 业务管理与任务触发模块 |
job-dispatcher-manager | 任务管理模块 | 管理任务变动并发出变动事件提醒(分布部署可基于消息进行改进) |
job-dispatcher-test | 测试模块 | 独立模块,含有一个接口,便于测试任务回调 |
类名(简要名称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
base.config.ScheduleTaskConfig | 定时任务配置器 | 任务的核心管理器 |
base.config.TaskConfig | 定时任务缓存 | 缓存任务信息 |
base.config.TaskType | 定时任务类型 | 枚举CRON,TRIGGER,FIXED,FIXED_DELAY |
base.BaseTask | 任务基础信息 | 任务实体bean,实现runnable和initializingbean |
cron基础类 | 因为csdn编辑器问题,类名写于此BaseCronTask | |
base.BaseFixedTask | fixed基础类 | 实现baseTask类,与cron类似 |
ScheduleBaseService | service类 | 对外暴露服务类 |
类名(要名简称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
manager.api.TaskManagerService | 任务持久化管理api | manager模块可以独立出去,所以该api为唯一访问接口 |
manager.event.TaskEvent | 任务事件 | 任务发送的事件实体,继承ApplicationEvent |
manager.event.TaskEventType | 任务事件类型 | 枚举ADD,UPDATE,REMOVE |
manager.support.MysqlTaskManagerSupport | api实现 | mysql持久化任务的实现 |
类名(要名简称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
advice.ExceptionAdvice | 异常统一处理 | |
advice.SuccessResponseAdvice | 成功统一处理 | |
config.InterceptorConfig | 拦截器配置 | |
interceptor.BusinessInterceptor | 拦截器实现 | 主要运用验证和获取header中的bid |
interceptor.BusinessContext | 上下文 | bid上下文管理 |
listener.TaskEventListener | 任务事件监听 | 监听任务事件,实现ApplicationListener接口 |
service.CallbackExecuteService | 回调处理服务 | 定时任务的回调地址调用 |
service.TaskExecuteService | 任务处理服务 | |
controller.TaskManagerController | 任务管理web接口 | 支持任务的增删改查 |
init.InitTask | 初始化任务 | 任务系统重启后需要重新将已有任务加入任务队列 |
vo.ResponseEntity | 相应参数 | |
vo.ArgumentsException | 参数异常 | 校验参数时,如果不合法会抛出此异常 |
JobStart | 启动类 | springboot项目启动类 |
类名(要名简称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
test.TestController | 测试controller | |
test.TestStart | 测试启动类 | springboot启动类 |
三、任务系统代码示例;
3.1 任务配置类(ScheduleTaskConfig)
package com.littlehow.job.base.config;
import com.littlehow.job.base.BaseTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.*;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.Date;
@Lazy(false)
@Configurable
@Component
@EnableScheduling
public class ScheduleTaskConfig implements SchedulingConfigurer {
private Logger log = LoggerFactory.getLogger(ScheduleTaskConfig.class);
private final static String TASK_NOT_EXISTS = "not exists";
private final static String TASK_EXISTS = "exists";
private final static String FAILURE = "failure";
private final static String SUCCESS = "success";
private ScheduledTaskRegistrar scheduledTaskRegistrar;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
this.scheduledTaskRegistrar = scheduledTaskRegistrar;
initTask();
}
/**
* 添加任务
* @param task
* @return
*/
public String addTask(BaseTask task) {
if (scheduledTaskRegistrar == null || task == null) {
return FAILURE;
}
if (TaskConfig.containsTask(task.getId())) {
return TASK_EXISTS;
}
try {
addTask0(task);
TaskConfig.addTask(task);
return SUCCESS;
} catch (Exception e) {
log.error("新增定时任务失败:" + task, e);
throw e;
}
}
/**
* 改变任务执行频率
* @param taskId
* @param expression
* @return
*/
public String changeTask(String taskId, String expression) {
BaseTask baseTask = TaskConfig.getTask(taskId);
if (baseTask == null || TaskType.TRIGGER != baseTask.taskType || expression == null) {
return TASK_NOT_EXISTS;
}
log.info("change trigger expression:(id=" + taskId + ",expression=" + expression+")");
baseTask.setExpression(expression);
return SUCCESS;
}
/**
* 取消定时任务
* @param taskId
* @return
*/
public String cancelTask(String taskId) {
if (!TaskConfig.containsTask(taskId)) {
return TASK_NOT_EXISTS;
}
try {
log.info("cancel task:" + taskId);
TaskConfig.removeTask(taskId).getScheduledTask().cancel();
} catch (Exception e) {
log.error("取消任务失败:" + taskId, e);
throw e;
}
return SUCCESS;
}
/**
* 初始化已配置任务
*/
private void initTask() {
TaskConfig.getTasks().forEach(task -> addTask0(task));
}
private void addTask0(BaseTask task) {
log.info("add task:" + task);
switch (task.taskType) {
case TRIGGER: task.setScheduledTask(addTriggerTask(task));
break;
case CRON: task.setScheduledTask(addCronTask(task, task.getExpression()));
break;
case FIXED_RATE: task.setScheduledTask(addFixedRateTask(task, task.interval()));
break;
case FIXED_DELAY: task.setScheduledTask(addFixedDelayTask(task, task.interval(), task.delay()));
break;
default:
}
}
/**
* 添加不可改变时间表的定时任务
* @param task
*/
private ScheduledTask addCronTask(Runnable task, String expression) {
return scheduledTaskRegistrar.scheduleCronTask(new CronTask(task, expression));
}
/**
* 添加可变时间task
* @param task
* @return
*/
private ScheduledTask addTriggerTask(BaseTask task) {
return scheduledTaskRegistrar.scheduleTriggerTask(new TriggerTask(task, triggerContext -> {
CronTrigger trigger = new CronTrigger(task.getExpression());
Date nextExec = trigger.nextExecutionTime(triggerContext);
return nextExec;
}));
}
/**
* 设置固定频率的定时任务
* @param task
* @param interval
*/
private ScheduledTask addFixedRateTask(Runnable task, long interval) {
return scheduledTaskRegistrar.scheduleFixedRateTask(new IntervalTask(task, interval, 0L));
}
/**
* 设置延迟以固定频率执行的定时任务
* @param task
* @param interval
* @param delay
*/
private ScheduledTask addFixedDelayTask(Runnable task, long interval, long delay) {
return scheduledTaskRegistrar.scheduleFixedDelayTask(new IntervalTask(task, interval, delay));
}
}
3.2 任务基本实体类(BaseTask)
package com.littlehow.job.base;
import com.littlehow.job.base.config.TaskConfig;
import com.littlehow.job.base.config.TaskType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.config.ScheduledTask;
/**
* 基本的任务配置类
*/
public abstract class BaseTask implements Runnable,InitializingBean {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
public final TaskType taskType;
private ScheduledTask scheduledTask;
private final String id;
public BaseTask(TaskType taskType, String id) {
this.taskType = taskType;
this.id = id;
}
/**
*
获取任务表达式如:0 0 0/1 * * *? (每个整点执行)
* @return
*/
public abstract String getExpression();
/**
* 固定频率执行的时间间隔
* @return
*/
public abstract long interval();
/**
* 固定频率执行的延迟时间
* @return
*/
public abstract long delay();
/**
* 设置任务表达式
* @param expression
*/
public abstract void setExpression(String expression);
/**
* 获取任务唯一标识
* @return
*/
public String getId() {
return id;
}
public final ScheduledTask getScheduledTask() {
return scheduledTask;
}
public final void setScheduledTask(ScheduledTask scheduledTask) {
this.scheduledTask = scheduledTask;
}
@Override
public void afterPropertiesSet() {
TaskConfig.addTask(this);
}
public String toString() {
return this.getClass().getSimpleName() + "(id:" + id + ",expression:" + getExpression()
+ ",type:" + taskType + ",interval:" + interval()
+ ", delay:" + delay() + ")";
}
}
3.3 任务执行类(TaskExecuteService)
package com.littlehow.job.service;
import com.littlehow.job.base.BaseCronTask;
import com.littlehow.job.base.BaseFixedTask;
import com.littlehow.job.base.ScheduleBaseService;
import com.littlehow.job.manager.pojo.TaskDto;
import com.littlehow.job.manager.pojo.TaskType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
public class TaskExecuteService {
@Autowired
private ScheduleBaseService scheduleBaseService;
@Autowired
private CallbackExecuteService executeService;
private static final Pattern fixed = Pattern.compile("fixed=(\d+)\s*(,\s*delay=(\d+))?");
/**
* 新增任务
* 因表达式解析错误可能会抛出异常
* @param taskDto
*/
public void addTask(TaskDto taskDto) {
if (taskDto == null) {
return;
}
String taskId = getTaskId(taskDto);
CallbackExecuteService.addOrUpdateTask(taskId, taskDto.getCallbackUrl());
TaskType taskType = taskDto.getTaskType();
switch (taskType) {
case FIXED:
case FIXED_DELAY:
long[] params = getFixedParameters(taskDto.getExpression());
BaseFixedTask baseFixedTask = new BaseFixedTask(taskId,
params[0], params[1]) {
@Override
public void run() {
executeService.execute(this.getId());
}
};
scheduleBaseService.addTask(baseFixedTask);
break;
case CRON:
case TRIGGER:
BaseCronTask baseCronTask = new BaseCronTask(taskType == TaskType.CRON ?
com.littlehow.job.base.config.TaskType.CRON :
com.littlehow.job.base.config.TaskType.TRIGGER, taskId) {
@Override
public void run() {
executeService.execute(this.getId());
}
};
baseCronTask.setExpression(taskDto.getExpression());
scheduleBaseService.addTask(baseCronTask);
}
}
/**
* 修改任务
* @param taskDto
*/
public void updateTask(TaskDto taskDto) {
if (!StringUtils.isEmpty(taskDto.getExpression())) {
//修改表达式
scheduleBaseService.changeTask(getTaskId(taskDto), taskDto.getExpression());
}
if (!StringUtils.isEmpty(taskDto.getCallbackUrl())) {
//修改回调地址
CallbackExecuteService.addOrUpdateTask(getTaskId(taskDto), taskDto.getCallbackUrl());
}
}
/**
* 删除任务
* @param taskDto
*/
public void removeTask(TaskDto taskDto) {
String taskId = getTaskId(taskDto);
scheduleBaseService.removeTask(taskId);
CallbackExecuteService.removeTask(taskId);
}
/**
* 获取fixed类型的实际参数信息
* @param expression
* @return
*/
private static long[] getFixedParameters(String expression) {
Matcher matcher = fixed.matcher(expression);
if (matcher.find()) {
long[] fixedParam = {0L, 0L};
fixedParam[0] = Long.parseLong(matcher.group(1));
String delay = matcher.group(3);
if (delay != null) {
fixedParam[1] = Long.parseLong(delay);
}
return fixedParam;
}
throw new IllegalArgumentException("invalid expression:" +expression);
}
/**
* 获取任务编号
* @param task
* @return
*/
private String getTaskId(TaskDto task) {
return task.getBusinessId() + "-" + task.getTaskId();
}
}
3.4 任务回调地址处理类(CallbackExecuteService)
package com.littlehow.job.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.Map;
/**
* 执行调用定时任务的回调地址
* 可以约定定时任务的回执报文,便于进一步解析
* littlehow 2018/4/7
*/
@Service
public class CallbackExecuteService {
private final static Logger log = LoggerFactory.getLogger(CallbackExecuteService.class);
/**
* 缓存任务与回调之间的关系
*/
private static final Map<String, String> callbackInfo = new HashMap<>();
/**
* http任务处理接口
*/
private RestTemplate restTemplate = new RestTemplate();
/**
* 执行远程任务
* @param taskId
*/
public void execute(String taskId) {
String url = callbackInfo.get(taskId);
if (url == null) {
log.error("任务[" + taskId + "]的回调地址为空");
//FIXME 可以在此处加上监控或通知,也可以已异常的形式抛出,用aop统一处理
return;
}
try {
//FIXME 如果制定了返回值规则,则可以详细解析,否则就简单解析状态码
ResponseEntity<String> response = restTemplate.getForEntity(url, String.class);
if (response.getStatusCodeValue() >= 200 && response.getStatusCodeValue() <300) {
//成功
log.info("定时任务[" + taskId + "]调用完成");
} else {
//失败
log.error("任务[" + taskId + ":" + url + "]执行失败," + response.toString());
}
} catch (Throwable t) {
log.error("任务[" + taskId + ":" + url + "]执行异常", t);
//FIXME 可以在此处加上监控或通知,也可以抛出此部分异常,用aop统一处理
}
}
/**
* 新增或修改任务回调
* @param taskId
* @param callbackUrl
*/
static String addOrUpdateTask(String taskId, String callbackUrl) {
return callbackInfo.put(taskId, callbackUrl);
}
/**
* 删除任务回调
* @param taskId
* @return
*/
static String removeTask(String taskId) {
return callbackInfo.remove(taskId);
}
}
最后
以上就是傻傻萝莉为你收集整理的基于springboot和quartz的任务调度系统的全部内容,希望文章能够帮你解决基于springboot和quartz的任务调度系统所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复