一、简述
项目地址:https://github.com/littlechare/job-manager
项目详细的信息请参看项目的README
下载地址:https://download.csdn.net/download/w172087242/10333176
之前想写一个关于定时任务的组建,便于项目中快速集成;
写完之后,又继续在此基础上演变成系统,当然该系统还缺少很多东西;
如业务方的bid生成与验证、任务回调响应报文的规则、任务调度的监控、如何做HA等等;
因为是个人项目,所以用了部分下班时间和本次清明节的空闲时间实现,
所以项目可定也有不足的地方,但是如果开发者想使用,也可以下载下来快速
集成,也可在此基础上加以修改。
二、项目功能简述
2.1 任务类型支持说明
任务类型 | 说明 | 表达式示例 | 备注 |
---|---|---|---|
CRON | Cron类型的任务,不支持表达式更改 | 0/20 * * * * *? | 每20秒调用一次 |
TRIGGER | Trigger类型的任务,支持表达式更改 | 0 0 4 * * *? | 每天凌晨4点调用 |
FIXED | Fixed类型任务,不支持表达式更改 | fixed=5000 | 单位毫秒,表示每5秒调用一次 |
FIXED_DELAY | Fixed_delay任务,不支持表达式更改 | fixed=5000,delay=10000 | 单位毫秒,表示延迟10秒后执行 |
2.2 业务方说明
1.业务方需要自己维护业务方编号(bid)和任务编号;
2.支持任务的随时取消、随时更改、随时添加等,回调地址一http(s)打头
2.3 模块说明与类说明
模块名称 | 功能 | 说明 |
---|---|---|
job-dispatcher-base | 任务基础模块 | 定时任务能力提供者(可抽离成组件) |
job-dispatcher-biz | 任务业务模块 | 业务管理与任务触发模块 |
job-dispatcher-manager | 任务管理模块 | 管理任务变动并发出变动事件提醒(分布部署可基于消息进行改进) |
job-dispatcher-test | 测试模块 | 独立模块,含有一个接口,便于测试任务回调 |
类名(简要名称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
base.config.ScheduleTaskConfig | 定时任务配置器 | 任务的核心管理器 |
base.config.TaskConfig | 定时任务缓存 | 缓存任务信息 |
base.config.TaskType | 定时任务类型 | 枚举CRON,TRIGGER,FIXED,FIXED_DELAY |
base.BaseTask | 任务基础信息 | 任务实体bean,实现runnable和initializingbean |
复制代码
| cron基础类 | 因为csdn编辑器问题,类名写于此BaseCronTask |
base.BaseFixedTask | fixed基础类 | 实现baseTask类,与cron类似 |
ScheduleBaseService | service类 | 对外暴露服务类 |
类名(要名简称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
manager.api.TaskManagerService | 任务持久化管理api | manager模块可以独立出去,所以该api为唯一访问接口 |
manager.event.TaskEvent | 任务事件 | 任务发送的事件实体,继承ApplicationEvent |
manager.event.TaskEventType | 任务事件类型 | 枚举ADD,UPDATE,REMOVE |
manager.support.MysqlTaskManagerSupport | api实现 | mysql持久化任务的实现 |
类名(要名简称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
advice.ExceptionAdvice | 异常统一处理 | |
advice.SuccessResponseAdvice | 成功统一处理 | |
config.InterceptorConfig | 拦截器配置 | |
interceptor.BusinessInterceptor | 拦截器实现 | 主要运用验证和获取header中的bid |
interceptor.BusinessContext | 上下文 | bid上下文管理 |
listener.TaskEventListener | 任务事件监听 | 监听任务事件,实现ApplicationListener接口 |
service.CallbackExecuteService | 回调处理服务 | 定时任务的回调地址调用 |
service.TaskExecuteService | 任务处理服务 | |
controller.TaskManagerController | 任务管理web接口 | 支持任务的增删改查 |
init.InitTask | 初始化任务 | 任务系统重启后需要重新将已有任务加入任务队列 |
vo.ResponseEntity | 相应参数 | |
vo.ArgumentsException | 参数异常 | 校验参数时,如果不合法会抛出此异常 |
JobStart | 启动类 | springboot项目启动类 |
类名(要名简称,前缀省略com.littlehow.job) | 功能 | 说明 |
---|---|---|
test.TestController | 测试controller | |
test.TestStart | 测试启动类 | springboot启动类 |
三、任务系统代码示例;
3.1 任务配置类(ScheduleTaskConfig)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140package com.littlehow.job.base.config; import com.littlehow.job.base.BaseTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Configurable; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.*; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; import java.util.Date; @Lazy(false) @Configurable @Component @EnableScheduling public class ScheduleTaskConfig implements SchedulingConfigurer { private Logger log = LoggerFactory.getLogger(ScheduleTaskConfig.class); private final static String TASK_NOT_EXISTS = "not exists"; private final static String TASK_EXISTS = "exists"; private final static String FAILURE = "failure"; private final static String SUCCESS = "success"; private ScheduledTaskRegistrar scheduledTaskRegistrar; @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { this.scheduledTaskRegistrar = scheduledTaskRegistrar; initTask(); } /** * 添加任务 * @param task * @return */ public String addTask(BaseTask task) { if (scheduledTaskRegistrar == null || task == null) { return FAILURE; } if (TaskConfig.containsTask(task.getId())) { return TASK_EXISTS; } try { addTask0(task); TaskConfig.addTask(task); return SUCCESS; } catch (Exception e) { log.error("新增定时任务失败:" + task, e); throw e; } } /** * 改变任务执行频率 * @param taskId * @param expression * @return */ public String changeTask(String taskId, String expression) { BaseTask baseTask = TaskConfig.getTask(taskId); if (baseTask == null || TaskType.TRIGGER != baseTask.taskType || expression == null) { return TASK_NOT_EXISTS; } log.info("change trigger expression:(id=" + taskId + ",expression=" + expression+")"); baseTask.setExpression(expression); return SUCCESS; } /** * 取消定时任务 * @param taskId * @return */ public String cancelTask(String taskId) { if (!TaskConfig.containsTask(taskId)) { return TASK_NOT_EXISTS; } try { log.info("cancel task:" + taskId); TaskConfig.removeTask(taskId).getScheduledTask().cancel(); } catch (Exception e) { log.error("取消任务失败:" + taskId, e); throw e; } return SUCCESS; } /** * 初始化已配置任务 */ private void initTask() { TaskConfig.getTasks().forEach(task -> addTask0(task)); } private void addTask0(BaseTask task) { log.info("add task:" + task); switch (task.taskType) { case TRIGGER: task.setScheduledTask(addTriggerTask(task)); break; case CRON: task.setScheduledTask(addCronTask(task, task.getExpression())); break; case FIXED_RATE: task.setScheduledTask(addFixedRateTask(task, task.interval())); break; case FIXED_DELAY: task.setScheduledTask(addFixedDelayTask(task, task.interval(), task.delay())); break; default: } } /** * 添加不可改变时间表的定时任务 * @param task */ private ScheduledTask addCronTask(Runnable task, String expression) { return scheduledTaskRegistrar.scheduleCronTask(new CronTask(task, expression)); } /** * 添加可变时间task * @param task * @return */ private ScheduledTask addTriggerTask(BaseTask task) { return scheduledTaskRegistrar.scheduleTriggerTask(new TriggerTask(task, triggerContext -> { CronTrigger trigger = new CronTrigger(task.getExpression()); Date nextExec = trigger.nextExecutionTime(triggerContext); return nextExec; })); } /** * 设置固定频率的定时任务 * @param task * @param interval */ private ScheduledTask addFixedRateTask(Runnable task, long interval) { return scheduledTaskRegistrar.scheduleFixedRateTask(new IntervalTask(task, interval, 0L)); } /** * 设置延迟以固定频率执行的定时任务 * @param task * @param interval * @param delay */ private ScheduledTask addFixedDelayTask(Runnable task, long interval, long delay) { return scheduledTaskRegistrar.scheduleFixedDelayTask(new IntervalTask(task, interval, delay)); } }
3.2 任务基本实体类(BaseTask)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64package com.littlehow.job.base; import com.littlehow.job.base.config.TaskConfig; import com.littlehow.job.base.config.TaskType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.scheduling.config.ScheduledTask; /** * 基本的任务配置类 */ public abstract class BaseTask implements Runnable,InitializingBean { protected final Logger log = LoggerFactory.getLogger(this.getClass()); public final TaskType taskType; private ScheduledTask scheduledTask; private final String id; public BaseTask(TaskType taskType, String id) { this.taskType = taskType; this.id = id; } /** * 获取任务表达式如:0 0 0/1 * * *? (每个整点执行) * @return */ public abstract String getExpression(); /** * 固定频率执行的时间间隔 * @return */ public abstract long interval(); /** * 固定频率执行的延迟时间 * @return */ public abstract long delay(); /** * 设置任务表达式 * @param expression */ public abstract void setExpression(String expression); /** * 获取任务唯一标识 * @return */ public String getId() { return id; } public final ScheduledTask getScheduledTask() { return scheduledTask; } public final void setScheduledTask(ScheduledTask scheduledTask) { this.scheduledTask = scheduledTask; } @Override public void afterPropertiesSet() { TaskConfig.addTask(this); } public String toString() { return this.getClass().getSimpleName() + "(id:" + id + ",expression:" + getExpression() + ",type:" + taskType + ",interval:" + interval() + ", delay:" + delay() + ")"; } }
3.3 任务执行类(TaskExecuteService)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108package com.littlehow.job.service; import com.littlehow.job.base.BaseCronTask; import com.littlehow.job.base.BaseFixedTask; import com.littlehow.job.base.ScheduleBaseService; import com.littlehow.job.manager.pojo.TaskDto; import com.littlehow.job.manager.pojo.TaskType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.util.regex.Matcher; import java.util.regex.Pattern; @Service public class TaskExecuteService { @Autowired private ScheduleBaseService scheduleBaseService; @Autowired private CallbackExecuteService executeService; private static final Pattern fixed = Pattern.compile("fixed=(\d+)\s*(,\s*delay=(\d+))?"); /** * 新增任务 * 因表达式解析错误可能会抛出异常 * @param taskDto */ public void addTask(TaskDto taskDto) { if (taskDto == null) { return; } String taskId = getTaskId(taskDto); CallbackExecuteService.addOrUpdateTask(taskId, taskDto.getCallbackUrl()); TaskType taskType = taskDto.getTaskType(); switch (taskType) { case FIXED: case FIXED_DELAY: long[] params = getFixedParameters(taskDto.getExpression()); BaseFixedTask baseFixedTask = new BaseFixedTask(taskId, params[0], params[1]) { @Override public void run() { executeService.execute(this.getId()); } }; scheduleBaseService.addTask(baseFixedTask); break; case CRON: case TRIGGER: BaseCronTask baseCronTask = new BaseCronTask(taskType == TaskType.CRON ? com.littlehow.job.base.config.TaskType.CRON : com.littlehow.job.base.config.TaskType.TRIGGER, taskId) { @Override public void run() { executeService.execute(this.getId()); } }; baseCronTask.setExpression(taskDto.getExpression()); scheduleBaseService.addTask(baseCronTask); } } /** * 修改任务 * @param taskDto */ public void updateTask(TaskDto taskDto) { if (!StringUtils.isEmpty(taskDto.getExpression())) { //修改表达式 scheduleBaseService.changeTask(getTaskId(taskDto), taskDto.getExpression()); } if (!StringUtils.isEmpty(taskDto.getCallbackUrl())) { //修改回调地址 CallbackExecuteService.addOrUpdateTask(getTaskId(taskDto), taskDto.getCallbackUrl()); } } /** * 删除任务 * @param taskDto */ public void removeTask(TaskDto taskDto) { String taskId = getTaskId(taskDto); scheduleBaseService.removeTask(taskId); CallbackExecuteService.removeTask(taskId); } /** * 获取fixed类型的实际参数信息 * @param expression * @return */ private static long[] getFixedParameters(String expression) { Matcher matcher = fixed.matcher(expression); if (matcher.find()) { long[] fixedParam = {0L, 0L}; fixedParam[0] = Long.parseLong(matcher.group(1)); String delay = matcher.group(3); if (delay != null) { fixedParam[1] = Long.parseLong(delay); } return fixedParam; } throw new IllegalArgumentException("invalid expression:" +expression); } /** * 获取任务编号 * @param task * @return */ private String getTaskId(TaskDto task) { return task.getBusinessId() + "-" + task.getTaskId(); } }
3.4 任务回调地址处理类(CallbackExecuteService)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68package com.littlehow.job.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.Map; /** * 执行调用定时任务的回调地址 * 可以约定定时任务的回执报文,便于进一步解析 * littlehow 2018/4/7 */ @Service public class CallbackExecuteService { private final static Logger log = LoggerFactory.getLogger(CallbackExecuteService.class); /** * 缓存任务与回调之间的关系 */ private static final Map<String, String> callbackInfo = new HashMap<>(); /** * http任务处理接口 */ private RestTemplate restTemplate = new RestTemplate(); /** * 执行远程任务 * @param taskId */ public void execute(String taskId) { String url = callbackInfo.get(taskId); if (url == null) { log.error("任务[" + taskId + "]的回调地址为空"); //FIXME 可以在此处加上监控或通知,也可以已异常的形式抛出,用aop统一处理 return; } try { //FIXME 如果制定了返回值规则,则可以详细解析,否则就简单解析状态码 ResponseEntity<String> response = restTemplate.getForEntity(url, String.class); if (response.getStatusCodeValue() >= 200 && response.getStatusCodeValue() <300) { //成功 log.info("定时任务[" + taskId + "]调用完成"); } else { //失败 log.error("任务[" + taskId + ":" + url + "]执行失败," + response.toString()); } } catch (Throwable t) { log.error("任务[" + taskId + ":" + url + "]执行异常", t); //FIXME 可以在此处加上监控或通知,也可以抛出此部分异常,用aop统一处理 } } /** * 新增或修改任务回调 * @param taskId * @param callbackUrl */ static String addOrUpdateTask(String taskId, String callbackUrl) { return callbackInfo.put(taskId, callbackUrl); } /** * 删除任务回调 * @param taskId * @return */ static String removeTask(String taskId) { return callbackInfo.remove(taskId); } }
最后
以上就是傻傻萝莉最近收集整理的关于基于springboot和quartz的任务调度系统的全部内容,更多相关基于springboot和quartz内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复