我是靠谱客的博主 完美秋天,最近开发中收集的这篇文章主要介绍Quartz使用(6) - Quartz项目实战,觉得挺不错的,现在分享给大家,希望可以做个参考。



1. 背景


a. 调用接口的任务均从mongo数据库读取;

b. 任务的个数随着业务量的增加而增加;

c. 每个调用任务的定时执行时间可能不同,且定时执行时间在mongo中可配置;

d. 任务的执行需要动态更新,如检测到某一任务的定时时间发生变化,则任务的执行也需要实时修改 

e. mongo、redis、elasticsearch等数据库中所存储的字段也由mongo进行配置;

f. 任务执行需要实时性较高、可靠性较强、可扩展性较高等


2. 框架


1) 首先构建从mongo加载任务

2) 将任务的配置信息初始化至Quartz

3) 通过Quartz的Job任务实现定时调用下载接口任务

4) 将下载的数据依据配置,存储至数据库中

5) 定时检测任务通过定时扫描mongo数据库,查看相关任务信息的配置是否发生变化,如果发生变化,则进行动态更新

6) 为了实现高可用性、可扩展性,可以直接使用Quartz原生的集群特性。

3. 核心代码


3.1 任务主流程  

import com.quartz.conf.Configuration;
import com.quartz.conf.OcpConfHelper;
import com.quartz.module.TaskInfo;
import org.apache.log4j.PropertyConfigurator;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class SchedulerRunner {
static Logger logger = LoggerFactory.getLogger(SchedulerRunner.class);
public static void main(String[] args) {
// 加载日志配置文件
// 加载quartz配置文件
System.setProperty("org.quartz.properties", "./conf/quartz.properties");
// 执行任务解析与调度

public static void run(){
// 获取配置信息表
List<TaskInfo> taskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();
if(taskInfos.size() == 0){
logger.info("there is no tasks from mongoInfo");
// 过滤下线任务
taskInfos = GenerateTaskInfo.filterTask(taskInfos);
if(taskInfos.size() == 0){
logger.info("all tasks if offline, no need to run");
Scheduler scheduler = null;
try {
scheduler = StdSchedulerFactory.getDefaultScheduler();
} catch (SchedulerException e) {
if(scheduler == null){
logger.error("create scheduler failed");
// 加入任务调度
for(TaskInfo task : taskInfos){
SchedulerFactory.addJob2Scheduler(task, scheduler);
// 加入动态更新任务

// 开启任务
try {
} catch (SchedulerException e) {
logger.error("start scheduler error!");
public static void clearSchedulerJob(Scheduler scheduler){
try {
} catch (SchedulerException e) {
scheduler error!");
* 基于配置文件中的信息,加载调度器开始运行时的清洗标识
* @return
private static boolean isSchedulerClear(){
Configuration conf = OcpConfHelper.getInstance().getOcpConf();
return conf.getBooleanValue("cleanSchedulerFlag", "true");
View Code

3.2 封装任务对象

import java.util.List;
import java.util.Map;
public class TaskInfo {
protected String categoryId; // 业务Id
protected String categoryName; // 业务名称
protected String sourceId; // 信源Id
protected String sourceName; // 信源名称
protected int sourceStatus; // 信源状态
protected String pipelineConf; // 信源pipeline配置信息
protected List<String> dbStoreTypes; // 业务的存储类型
protected String esConfInfo; // ES存储配置
protected String dbConfInfo; // DB存储配置
protected String cronInfo; // 定时任务信息
protected int sourceType; // 实时更新还是离线更新
protected List<String> indexBuildEles; // 更新索引的信息
protected List<String> idBuildEles; // id的构建因素
protected String indexType; // 全量或增量
protected String categoryLevel1; // 一级分类
protected String zhName; // 中文信息
protected Map<String,String> outputType; //输出参数名及其类型
protected String providerName;
protected String functionName; //category_function名称
public String getProviderName() {
return providerName;
public void setProviderName(String providerName) {
this.providerName = providerName;
public String getCategoryId() {
return categoryId;
public void setCategoryId(String categoryId) {
this.categoryId = categoryId;
public String getCategoryName() {
return categoryName;
public void setCategoryName(String categoryName) {
this.categoryName = categoryName;
public String getSourceId() {
return sourceId;
public void setSourceId(String sourceId) {
this.sourceId = sourceId;
public String getSourceName() {
return sourceName;
public void setSourceName(String sourceName) {
this.sourceName = sourceName;
public int getSourceStatus() {
return sourceStatus;
public void setSourceStatus(int sourceStatus) {
this.sourceStatus = sourceStatus;
public String getPipelineConf() {
return pipelineConf;
public void setPipelineConf(String pipelineConf) {
this.pipelineConf = pipelineConf;
public String getEsConfInfo() {
return esConfInfo;
public void setEsConfInfo(String esConfInfo) {
this.esConfInfo = esConfInfo;
public String getDbConfInfo() {
return dbConfInfo;
public void setDbConfInfo(String dbConfInfo) {
this.dbConfInfo = dbConfInfo;
public String getCronInfo() {
return cronInfo;
public void setCronInfo(String cronInfo) {
this.cronInfo = cronInfo;
public int getSourceType() {
return sourceType;
public void setSourceType(int sourceType) {
this.sourceType = sourceType;
public List<String> getIdBuildEles() {
return idBuildEles;
public void setIdBuildEles(List<String> idBuildEles) {
this.idBuildEles = idBuildEles;
public List<String> getIndexBuildEles() {
return indexBuildEles;
public void setIndexBuildEles(List<String> indexBuildEles) {
this.indexBuildEles = indexBuildEles;
public String getIndexType() {
return indexType;
public void setIndexType(String indexType) {
this.indexType = indexType;
public String getCategoryLevel1() {
return categoryLevel1;
public void setCategoryLevel1(String categoryLevel1) {
this.categoryLevel1 = categoryLevel1;
public String getZhName() {
return zhName;
public void setZhName(String zhName) {
this.zhName = zhName;
public TaskInfo(){}
public List<String> getDbStoreTypes() {
return dbStoreTypes;
public void setDbStoreTypes(List<String> dbStoreTypes) {
this.dbStoreTypes = dbStoreTypes;
public Map<String, String> getOutputType() {
return outputType;
public void setOutputType(Map<String, String> outputType) {
this.outputType = outputType;
public String getFunctionName() {
return functionName;
public void setFunctionName(String functionName) {
this.functionName = functionName;
* 是否有相同的定时信息
* @param taskInfo
* @return
public boolean hasSameCronInfo(TaskInfo taskInfo){
if(taskInfo == null) return false;
return this.getCronInfo().equalsIgnoreCase(taskInfo.getCronInfo());
public String toString() {
return "TaskInfo{" +
"categoryId='" + categoryId + ''' +
", categoryName='" + categoryName + ''' +
", sourceId='" + sourceId + ''' +
", sourceName='" + sourceName + ''' +
", sourceStatus=" + sourceStatus +
", pipelineConf='" + pipelineConf + ''' +
", dbStoreTypes=" + dbStoreTypes +
", esConfInfo='" + esConfInfo + ''' +
", dbConfInfo='" + dbConfInfo + ''' +
", cronInfo='" + cronInfo + ''' +
", sourceType=" + sourceType +
", indexBuildEles=" + indexBuildEles +
", idBuildEles=" + idBuildEles +
", indexType='" + indexType + ''' +
", categoryLevel1='" + categoryLevel1 + ''' +
", zhName='" + zhName + ''' +
", outputType='" + outputType + ''' +
", providerName='" + providerName + ''' +
", functionName='" + functionName + ''' +
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TaskInfo taskInfo = (TaskInfo) o;
if (sourceStatus != taskInfo.sourceStatus) return false;
if (sourceType != taskInfo.sourceType) return false;
if (categoryName != null ? !categoryName.equals(taskInfo.categoryName) : taskInfo.categoryName != null)
return false;
if (sourceName != null ? !sourceName.equals(taskInfo.sourceName) : taskInfo.sourceName != null) return false;
if (providerName != null ? !providerName.equals(taskInfo.providerName) : taskInfo.providerName != null) return false;
if (pipelineConf != null ? !pipelineConf.equals(taskInfo.pipelineConf) : taskInfo.pipelineConf != null)
return false;
if (dbStoreTypes != null ? !dbStoreTypes.equals(taskInfo.dbStoreTypes) : taskInfo.dbStoreTypes != null)
return false;
if (esConfInfo != null ? !esConfInfo.equals(taskInfo.esConfInfo) : taskInfo.esConfInfo != null) return false;
if (dbConfInfo != null ? !dbConfInfo.equals(taskInfo.dbConfInfo) : taskInfo.dbConfInfo != null) return false;
if (cronInfo != null ? !cronInfo.equals(taskInfo.cronInfo) : taskInfo.cronInfo != null) return false;
if (indexBuildEles != null ? !indexBuildEles.equals(taskInfo.indexBuildEles) : taskInfo.indexBuildEles != null)
return false;
if (idBuildEles != null ? !idBuildEles.equals(taskInfo.idBuildEles) : taskInfo.idBuildEles != null)
return false;
if (indexType != null ? !indexType.equals(taskInfo.indexType) : taskInfo.indexType != null) return false;
if (categoryLevel1 != null ? !categoryLevel1.equals(taskInfo.categoryLevel1) : taskInfo.categoryLevel1 != null)
return false;
if (outputType != null ? !outputType.equals(taskInfo.outputType) : taskInfo.outputType != null)
return false;
if (functionName != null ? !functionName.equals(taskInfo.functionName) : taskInfo.functionName != null)
return false;
return zhName != null ? zhName.equals(taskInfo.zhName) : taskInfo.zhName == null;
public int hashCode() {
int result = categoryName != null ? categoryName.hashCode() : 0;
result = 31 * result + (sourceName != null ? sourceName.hashCode() : 0);
result = 31 * result + (providerName != null ? providerName.hashCode() : 0);
result = 31 * result + sourceStatus;
result = 31 * result + (pipelineConf != null ? pipelineConf.hashCode() : 0);
result = 31 * result + (dbStoreTypes != null ? dbStoreTypes.hashCode() : 0);
result = 31 * result + (esConfInfo != null ? esConfInfo.hashCode() : 0);
result = 31 * result + (dbConfInfo != null ? dbConfInfo.hashCode() : 0);
result = 31 * result + (cronInfo != null ? cronInfo.hashCode() : 0);
result = 31 * result + sourceType;
result = 31 * result + (indexBuildEles != null ? indexBuildEles.hashCode() : 0);
result = 31 * result + (idBuildEles != null ? idBuildEles.hashCode() : 0);
result = 31 * result + (indexType != null ? indexType.hashCode() : 0);
result = 31 * result + (categoryLevel1 != null ? categoryLevel1.hashCode() : 0);
result = 31 * result + (zhName != null ? zhName.hashCode() : 0);
result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
result = 31 * result + (functionName != null ? functionName.hashCode() : 0);
return result;
View Code

3.3 任务的构造及初始化

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.quartz.consts.SourceType;
import com.quartz.consts.Sql;
import com.quartz.consts.StatusType;
import com.quartz.module.TaskInfo;
import com.quartz.util.MongoUtil;
import com.quartz.util.MySqlUtil;
import com.quartz.util.TimeUtil;
import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import org.bson.types.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.*;
* 获取调度任务的相关信息
* Created by songwang4 on 2017/6/7.
public class GenerateTaskInfo {
static Logger logger = LoggerFactory.getLogger(GenerateTaskInfo.class);
static DBCollection sourceColl = MongoUtil.createOcpSourceDB();
static DBCollection categoryColl = MongoUtil.createOcpCategoryDB();
* 从数据库中读取任务相关信息
* @return
public static List<TaskInfo> generateTaskInfoFromMongo() {
// 将任务信息进行封装
List<TaskInfo> tasks = Lists.newArrayList();
TaskInfo task = null;
DBCursor sourceCur = sourceColl.find();
DBObject sourceObj = null;
DBObject categoryObj = null;
while (sourceCur.hasNext()) {
sourceObj = sourceCur.next();
task = new TaskInfo();
String sourceName = sourceObj.get("sourceName").toString();
String categoryName = sourceObj.get("category").toString();
// 基于业务名查找对应的业务表信息
categoryObj = categoryColl.findOne(new BasicDBObject("catName", categoryName));
if (categoryObj == null) {
logger.error("no category found through source: " + sourceName);
task.setCategoryId(categoryObj.get("_id").toString()); // 业务Id
task.setCategoryName(categoryName); // 业务名

List<String> dbStoreTypes = Lists.newArrayList();
if (categoryObj.containsField("storeType")) {
try {
JSONArray storeTypeArr = JSON.parseArray(categoryObj.get("storeType").toString());
for (int i = 0; i < storeTypeArr.size(); i++) {
} catch (Exception e) {
task.setDbStoreTypes(dbStoreTypes); // 存储类型

task.setCategoryLevel1(categoryObj.get("parent").toString()); // 一级业务分类
task.setDbConfInfo(categoryObj.containsField("db") ? categoryObj.get("db").toString() : categoryName); // DB配置
task.setEsConfInfo(categoryObj.containsField("es") ? categoryObj.get("es").toString() : categoryName); // ES配置

task.setIndexBuildEles(extractBuilderEles(categoryObj, "isIndex", "itemName")); // 构建ES索引信息
task.setIdBuildEles(extractBuilderEles(categoryObj, "isGK", "itemName")); // 构建id的信息元素

task.setSourceId(sourceObj.get("_id").toString()); // 信源Id
task.setSourceName(sourceName); // 信源名称
int status = StatusType.OFFLINE;
if (sourceObj.containsField("status")) {
String statusType = sourceObj.get("status").toString();
if (statusType.equals(StatusType.STR_ONLINE)) {
status = StatusType.ONLINE;
task.setSourceStatus(status); // 信源的上下线状态
int sourceType = SourceType.REAL_TIME_PROCESS;
if (sourceObj.containsField("type")) {
String strStatusType = sourceObj.get("type").toString();
if (strStatusType.equals(SourceType.STR_OFF_LINE_PROCESS)) {
sourceType = SourceType.OFF_LINE_PROCESS;
task.setSourceType(sourceType); // 离线或实时处理

task.setIndexType(sourceObj.containsField("indexType") ?
sourceObj.get("indexType").toString() : ""); // 增量或全量标识
// 定时时间配置
task.setCronInfo(sourceObj.containsField("timerInfo") ?
sourceObj.get("timerInfo").toString() : "");
if (task.getCronInfo().trim().length() == 0) {
task.setPipelineConf(sourceObj.containsField("mappingWorkflow") ?
sourceObj.get("mappingWorkflow").toString() : ""); // pipeline配置信息

return tasks;
* 构建生成id或es的信息元素
* @param categoryObj
* @param queryField
* @param retureField
* @return
public static List<String> extractBuilderEles(DBObject categoryObj, String queryField, String retureField) {
List<String> builerEles = Lists.newArrayList();
JSONArray dataItemArr = null;
try {
dataItemArr = JSON.parseArray(categoryObj.get("dataItems").toString());
} catch (JSONException e) {
if (dataItemArr != null && dataItemArr.size() > 0) {
JSONObject dataItemJson = null;
for (int i = 0; i < dataItemArr.size(); i++) {
dataItemJson = dataItemArr.getJSONObject(i);
if (dataItemJson.containsKey(queryField) && dataItemJson.getBoolean(queryField)) {
return builerEles;
* 基于业务表中的信息构造定时任务表达式
* @param sourceObj
* @return
public static String generateCronInfo(DBObject sourceObj) {
String updateTimeType = "";
String updateTimeCycle = "";
if (sourceObj.containsField("updateType")) {
updateTimeType = sourceObj.get("updateType").toString();
if (sourceObj.containsField("updateCycle")) {
updateTimeCycle = sourceObj.get("updateCycle").toString();
if (updateTimeType.trim().length() == 0 || updateTimeCycle.trim().length() == 0) {
return "";
StringBuilder sb = new StringBuilder();
Date date = null;
if (updateTimeType.equalsIgnoreCase("YEAR")) {
date = TimeUtil.parseDate(updateTimeCycle, "MM-dd HH:mm");
if (date == null) {
try {
sb.append(TimeUtil.extractFixedTimeByDay(Integer.parseInt(updateTimeCycle), 0, 0));
} catch (NumberFormatException e) {
} else {
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.MONTH) + 1).append(" ? *");
if (updateTimeType.equalsIgnoreCase("MONTH")) {
date = TimeUtil.parseDate(updateTimeCycle, "dd HH:mm");
if (date == null) return "";
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" * ?");
if (updateTimeType.equalsIgnoreCase("DAY")) {
date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
if (date == null) return "";
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" * * ?");
if (updateTimeType.equalsIgnoreCase("WEEK")) {
String weekDay = "1";
if (sourceObj.containsField("weekDay")) {
weekDay = sourceObj.get("weekDay").toString();
date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
if (date == null) return "";
sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
.append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ? * ")
if (updateTimeType.equalsIgnoreCase("HOUR")) {
try {
int hour = Integer.parseInt(updateTimeCycle);
sb.append(TimeUtil.extractFixedTimeByHour(hour, 0));
} catch (NumberFormatException e) {
if (updateTimeType.equalsIgnoreCase("MINUTE")) {
try {
int minute = Integer.parseInt(updateTimeCycle);
} catch (NumberFormatException e) {
if (updateTimeType.equalsIgnoreCase("SECOND")) {
sb.append("*/").append(updateTimeCycle).append(" * * * * ?");
return sb.toString();
* 过滤下线的任务
* @param tasks
* @return
public static List<TaskInfo> filterTask(List<TaskInfo> tasks) {
List<TaskInfo> taskInfos = Lists.newArrayList();
for (TaskInfo taskInfo : tasks) {
// 过滤下线的信源状态或实时的信源
if (taskInfo.getSourceStatus() == StatusType.OFFLINE
|| taskInfo.getSourceType() != SourceType.OFF_LINE_PROCESS) {
return taskInfos;
* 基于业务名称对任务进行分组
* @param oriTasks
* @return
public static Map<String, List<TaskInfo>> groupTaskByCategory(List<TaskInfo> oriTasks) {
Map<String, List<TaskInfo>> categoryTasks = Maps.newHashMap();
for (TaskInfo oriTask : oriTasks) {
if (!categoryTasks.containsKey(oriTask.getCategoryId())) {
List<TaskInfo> taskInfos = Lists.newArrayList();
categoryTasks.put(oriTask.getCategoryId(), taskInfos);
} else {
boolean hasSameSourceId = false;
for (TaskInfo taskInfo : categoryTasks.get(oriTask.getCategoryId())) {
if (taskInfo.getSourceId().equals(oriTask.getSourceId())) {
hasSameSourceId = true;
if (!hasSameSourceId) {
return categoryTasks;
View Code

3.4 调用下载接口的任务

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.pipes.entity.CrawlerLog;
import com.pipes.out.IDataOut;
import com.pipes.parser.PipeExecuter;
import com.quartz.consts.SourceType;
import com.quartz.consts.StatusType;
import com.quartz.consts.StoreType;
import com.quartz.module.TaskInfo;
import com.quartz.output.*;
import com.quartz.util.CrawlerLogUtil;
import org.apache.log4j.PropertyConfigurator;
import org.elasticsearch.common.Strings;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
* 离线存储任务
* 注意:上一个任务如未完成,且下一次的定时任务已到执行时间,则需要等待上一个任务
* 执行完成,再进行下一个任务
public class ScheduleJob implements Job {
static Logger logger = LoggerFactory.getLogger(ScheduleJob.class);
public ScheduleJob() {
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDetail jobDetail = jobExecutionContext.getJobDetail();
JSONObject json = new JSONObject();
json.put("jobName", jobDetail.getKey().getName());
json.put("jobGroup", jobDetail.getKey().getGroup());
json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());
json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());
logger.info("job is running: " + json.toString());
JobDataMap dataMap = jobDetail.getJobDataMap();
JSONObject confJson = null;
try {
confJson = JSONObject.parseObject(dataMap.getString(SchedulerFactory.CONF_INFO));
} catch (JSONException e) {
if (confJson == null) {
logger.error("conf is empty: " + json.toString());
// 获取存储类型
TaskInfo taskInfo = new Gson().fromJson(confJson.toString(), TaskInfo.class);
if (!isNeedtoRun(taskInfo)) {
logger.info("no need to run: " + json.toString());
List<IDataOut> dataOuts = Lists.newArrayList();
for (String dbStoreType : taskInfo.getDbStoreTypes()) {
switch (dbStoreType) {
case StoreType.STR_MONGO_STORE:
dataOuts.add(new DataOut2Mongo(taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
case StoreType.STR_ES_STORE:
dataOuts.add(new DataOut2ES(taskInfo.getCategoryName(),taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getIndexBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
case StoreType.STR_REDIS_STORE:
dataOuts.add(new DataOut2Redis(taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
// 创建数据拉取对象,拉取前存储一次,拉取后存储一次
CrawlerLog crawlerLog = createCrawlerLog(taskInfo);
if (dataOuts.size() > 0) {
PipeExecuter.executeSave(taskInfo.getPipelineConf(), dataOuts, crawlerLog);
* 判断job是否需要执行
* @param taskInfo
* @return
public static boolean isNeedtoRun(TaskInfo taskInfo) {
// 实时or离线
if (taskInfo.getSourceType() == SourceType.REAL_TIME_PROCESS) {
logger.warn("the job is real-time process, no need to run");
return false;
// job的上下线状态
if (taskInfo.getSourceStatus() == StatusType.OFFLINE) {
logger.warn("the job status is offline, no need to run");
return false;
// pipeline的配置信息
if (Strings.isNullOrEmpty(taskInfo.getPipelineConf()) || taskInfo.getPipelineConf().trim().length() == 0) {
logger.warn("no pipeline configure info, no need to run");
return false;
// job的存储信息
if (taskInfo.getDbStoreTypes().size() == 0) {
logger.warn("the job store type is 0, no need to store");
return false;
return true;
* 创建拉取数据的日志,以便管理系统查看
* @param taskInfo
* @return
public CrawlerLog createCrawlerLog(TaskInfo taskInfo) {
CrawlerLog crawlerLog = new CrawlerLog();
crawlerLog.setIndexType(taskInfo.getIndexType()); // 增量还是全量

String sourceName = taskInfo.getSourceName();
String sourceZhName = taskInfo.getZhName();
String sourceArr[] = sourceName.split("_");
String sourceZhArr[] = sourceZhName.split("_");
crawlerLog.setCategoryLv2((sourceArr != null && sourceArr.length > 0) ? sourceArr[0] : "");
crawlerLog.setFunctionName((sourceArr != null && sourceArr.length > 1) ? sourceArr[1] : "");
crawlerLog.setProviderName((sourceArr != null && sourceArr.length > 2) ? sourceArr[2] : "");
crawlerLog.setFunctionZhName((sourceZhArr != null && sourceZhArr.length > 1) ? sourceZhArr[1] : "");
crawlerLog.setProviderZhName((sourceZhArr != null && sourceZhArr.length > 2) ? sourceZhArr[2] : "");
return crawlerLog;
View Code

3.5 任务调度工厂


import com.google.gson.Gson;
import com.quartz.consts.PrefixType;
import com.quartz.module.TaskInfo;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchedulerFactory {
static Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
public static final String CONF_INFO = "conf_info";
public static final String DYNAMIC_UPDATE_JOB_NAME = "dynamicUpdateJob";
public static final String DYNAMIC_UPDATE_GROUP_NAME = "dynamicUpdateGroup";
public static final String DYNAMIC_UPDATE_CRONTINFO = "*/30 * * * * ?";
* 将任务加入任务调度中
* @param taskInfo
* @param scheduler
public static void addJob2Scheduler(TaskInfo taskInfo, Scheduler scheduler) {
try {
JobDetail jobDetail = generateJobDetail(taskInfo);
if(jobDetail == null){
logger.error("create job failed!");
Trigger triger = generateTrigger(taskInfo);
if(triger == null){
logger.error("create trigger failed!");
// 加载执行Job及定时器

} catch (SchedulerException e) {
logger.error("create scheduler error, error message: "+e.toString());
public static void addDynamicUpdateJob2Scheduler(Scheduler scheduler) {
try {
JobDetail jobDetail = generateDynamicUpdateJobDetail(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME);
if(jobDetail == null){
logger.error("create job failed!");
if(triger == null){
logger.error("create trigger failed!");
// 加载执行Job及定时器

} catch (SchedulerException e) {
logger.error("create scheduler error, error message: "+e.toString());
* 于信源信息生成对应的job
* @param taskInfo
* @return
public static JobDetail generateJobDetail(TaskInfo taskInfo) {
String jobName = taskInfo.getSourceName();
if(jobName.trim().length() == 0){
logger.error("job name is empty, please check!");
return null;
String jobGroup = taskInfo.getCategoryName();
if(jobGroup.trim().length() == 0){
logger.error("job group is empty, please check!");
return null;
return JobBuilder.newJob(ScheduleJob.class)
.withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)
.usingJobData(CONF_INFO, new Gson().toJson(taskInfo)).build();
public static JobDetail generateDynamicUpdateJobDetail(String jobName, String jobGroup) {
if(jobName.trim().length() == 0){
logger.error("job name is empty, please check!");
return null;
if(jobGroup.trim().length() == 0){
logger.error("job group is empty, please check!");
return null;
return JobBuilder.newJob(DynamicUpdateJob.class)
.withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)
* 基于信源信息生成对应的trigger
* @param taskInfo
* @return
public static Trigger generateTrigger(TaskInfo taskInfo) {
String sourceTriggerName = taskInfo.getSourceName();
if(sourceTriggerName.trim().length() == 0){
logger.error("trigger name is empty, please check!");
return null;
String sourceTriggerGroup = taskInfo.getCategoryName();
if(sourceTriggerGroup.trim().length() == 0){
logger.error("trigger group is empty, please check!");
return null;
String cronInfo = taskInfo.getCronInfo();
if(cronInfo.trim().length() == 0){
logger.error("cron timer info is empty, please check!");
return null;
return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,
public static Trigger generateTrigger(String sourceTriggerName, String sourceTriggerGroup, String cronInfo) {
if(sourceTriggerName.trim().length() == 0){
logger.error("trigger name is empty, please check!");
return null;
if(sourceTriggerGroup.trim().length() == 0){
logger.error("trigger group is empty, please check!");
return null;
if(cronInfo.trim().length() == 0){
logger.error("cron timer info is empty, please check!");
return null;
return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,
View Code

3.6 动态检测任务更新的Job

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.quartz.consts.PrefixType;
import com.quartz.module.TaskInfo;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class DynamicUpdateJob implements Job{
private static Logger logger = LoggerFactory.getLogger(DynamicUpdateJob.class);
public DynamicUpdateJob(){}
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDetail jobDetail = jobExecutionContext.getJobDetail();
JSONObject json = new JSONObject();
json.put("jobName", jobDetail.getKey().getName());
json.put("jobGroup", jobDetail.getKey().getGroup());
json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());
json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());
logger.info("job is running: "+json.toString());
// 获取当前的调度器
Scheduler scheduler = jobExecutionContext.getScheduler();
// 获取配置信息中的任务(注意需要保持)
List<TaskInfo> confTaskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();
// 获取所有的job信息
List<JobKey> schedulerJobKeys = acquireJobKeysWithinSceduler(scheduler);
// 1. 配置任务不存在,而sheduler相关任务存在,则进行下线处理
for(JobKey schedulerJobKey : schedulerJobKeys ){
boolean hasSameJobKeyInConfTask = false;
for(TaskInfo confTaskInfo : confTaskInfos){
hasSameJobKeyInConfTask = true;
try {
logger.info("delete offline job: "+schedulerJobKey.toString());
} catch (SchedulerException e) {
logger.error("delete offline job error: "+json.toString());
// 2 配置任务与调度器任务比较
for(TaskInfo confTaskInfo : confTaskInfos){
JobKey confJobKey = generateJobKey(confTaskInfo);
boolean hasSameJob = false;
for(JobKey schedulerJobKey : schedulerJobKeys ){
hasSameJob = true;
logger.info("has same jobKey: "+confJobKey);
JobDetail schedulerJobDetail = null;
try {
schedulerJobDetail = scheduler.getJobDetail(confJobKey);
} catch (SchedulerException e) {
logger.error("get job detail from scheduler error: "+confJobKey);
if(schedulerJobDetail == null) continue;
// 1) 是否需要下线
try {
logger.info("has same jobKey and offline the job "+confJobKey);
} catch (SchedulerException e) {
logger.error("delete offline job error: "+confJobKey);
// 2) 是否需要更新任务
TaskInfo schedulerTaskInfo = parseTaskInfoFromJobDataMap(schedulerJobDetail);
logger.info("confTaskInfo: " + confTaskInfo);
logger.info("schedulerTaskInfo: " + schedulerTaskInfo);
try {
logger.info("has same jobKey and update the job "+confJobKey);
SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);
} catch (SchedulerException e) {
logger.error("update scheduler info error: "+confJobKey);
logger.info("the job info is same "+confJobKey);
}else{ // 创建新的Job
// 1) 是否满足上线的条件
logger.info("the status is offline, no need to create new job: "+confJobKey);
logger.info("no same jobKey and create job "+confJobKey);
// 2) 上线

SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);
protected List<JobKey> acquireJobKeysWithinSceduler(Scheduler scheduler){
List<JobKey> jobKeys = Lists.newArrayList();
try {
for(String groupName : scheduler.getJobGroupNames()){
for(JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){
} catch (SchedulerException e) {
return jobKeys;
TaskInfo parseTaskInfoFromJobDataMap(JobDetail jobDetail){
try {
String confInfo = jobDetail.getJobDataMap().getString(SchedulerFactory.CONF_INFO);
return new Gson().fromJson(confInfo, TaskInfo.class);
} catch (Exception e) {
logger.error("parse task info from JobDataMap error!");
return null;
protected JobKey generateJobKey(TaskInfo taskInfo){
return generateJobKey(taskInfo.getSourceName(), taskInfo.getCategoryName());
protected JobKey generateJobKey(String jobName, String jobGroup){
return JobKey.jobKey(PrefixType.JOB_PREFIX+jobName,PrefixType.JOB_PREFIX+jobGroup);
View Code

3.7 Es数据库存储

import com.alibaba.fastjson.JSONObject;
import com.pipes.out.IDataOut;
import com.quartz.conf.Configuration;
import com.quartz.conf.OcpConfHelper;
import com.quartz.util.IdBuilder;
import com.quartz.util.OutputTypeTransform;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
* Created by songwang4 on 2017/6/7.
public class DataOut2ES implements IDataOut, IDataClose {
static Logger logger = LoggerFactory.getLogger(DataOut2ES.class);
static TransportClient client;
String indexName; // 默认为ocp

String typeName;
String sourceName;
List<String> indexBuildEles;
List<String> idBuilderEles;
Map<String,String> outputType;
String providerName;
public DataOut2ES(String indexName,String type){
this.indexName = indexName;
this.typeName = type;
public DataOut2ES(String indexName,String type, List<String> indexBuildEles){
this.indexBuildEles = indexBuildEles;
public DataOut2ES(String indexName,String type, List<String> idBuilderEles, List<String> indexBuildEles){
this(indexName,type, indexBuildEles);
this.idBuilderEles = idBuilderEles;
public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles){
this(indexName,type, idBuilderEles, indexBuildEles);
this.sourceName = sourceName;
public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles, Map<String,String> outputType,String providerName){
this(indexName,type,sourceName, idBuilderEles, indexBuildEles);
this.outputType = outputType;
this.providerName = providerName;
public static void init() {
if(client == null){
Configuration conf = OcpConfHelper.getInstance().getOcpConf();
String esClusterName = conf.getStringValue("ocp_es_cluster_name", "");
String esIp = conf.getStringValue("ocp_es_ip", "");
int esPort = conf.getIntValue("ocp_es_port", "");
Settings settings = Settings.builder()
.put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "120s")
try {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esIp),esPort));
} catch (UnknownHostException e) {
* 批量写入
* @param datas
public void save(List<JSONObject> datas) {
// 批量的插入数据
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(JSONObject data : datas){
data = OutputTypeTransform.transform(data,outputType);

String id64 = IdBuilder.generateId(this.sourceName, data, this.idBuilderEles);
if(id64.trim().length() == 0) continue;
JSONObject indexJson = new JSONObject();
for(String indexBuildEle : this.indexBuildEles){
indexJson.put(indexBuildEle, data.get(indexBuildEle));
logger.info("no json fields, so no need to save");
bulkRequest.add(client.prepareIndex(indexName, typeName, id64).setSource(indexJson.toString()));
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
logger.error("insert data 2 es error "+indexName);
public void saveWithoutIndexBuilds(List<JSONObject> datas) {
// 批量的插入数据
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(JSONObject data : datas){
bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(data.toString()));
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
logger.error("insert data 2 es error "+indexName);
public void saveWithoutIndexBuilds2(List<JSONObject> datas) {
// 批量的插入数据
BulkRequestBuilder bulkRequest = client.prepareBulk();
for(JSONObject data : datas){
String _id = data.getString("_id");
JSONObject source = data.getJSONObject("_source");
bulkRequest.add(client.prepareIndex(indexName, typeName,_id).setSource(source.toString()));
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
logger.error("insert data 2 es error "+indexName);
* 判断索引是否存在
* @param indexName
* @return
public boolean isExistsIndex(String indexName){
IndicesExistsResponse response = client.admin().indices()
.exists(new IndicesExistsRequest().indices(new String[]{indexName})).actionGet();
return response.isExists();
* 创建索引信息
* @param indexName
* @return
public boolean createIndex(String indexName){
try {
CreateIndexResponse indexResponse = this.client
return indexResponse.isAcknowledged();
} catch (ElasticsearchException e) {
return false;
public void save(Object data) {
if(this.indexBuildEles.size() == 0){
logger.error("index fields are empty in es, no index need to save, info: " + data.toString());
// 逐条插入数据
JSONObject json = null;
try {
json = (JSONObject)data;
} catch (Exception e) {
logger.error("trans data to json error in es :" + data.toString());
if(json == null){
logger.error("trans data to Json error in es, info " + data.toString());
json = OutputTypeTransform.transform(json,outputType);
// 构建索引id
String id64 = IdBuilder.generateId(this.sourceName, json, this.idBuilderEles);
if(id64.trim().length() == 0){
logger.error("generate 64 bit id is null,please check: " + data.toString());
JSONObject indexJson = new JSONObject();
for(String indexBuildEle : this.indexBuildEles){
indexJson.put(indexBuildEle, json.get(indexBuildEle));
logger.info("no json fields, so no need to save");
logger.info("index info: "+indexJson);
IndexResponse response = client.prepareIndex(this.indexName, this.typeName, id64).setSource(indexJson.toString()).get();
if(response.status() != RestStatus.CREATED && response.status() != RestStatus.OK){
logger.error("index error in es, status is "+response.status().getStatus()+"info: " + data.toString());
public void close() {
View Code


4. 集群模式


# Configure Main Scheduler Properties
org.quartz.scheduler.instanceName: OcpScheduler
org.quartz.scheduler.instanceId: OcpInstance
org.quartz.scheduler.skipUpdateCheck: true
# Configure ThreadPool
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
# Configure JobStore
org.quartz.jobStore.misfireThreshold: 120000
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties: false
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.dataSource: ocpQzDs
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.clusterCheckinInterval = 60000
# Configure Datasources
org.quartz.dataSource.ocpQzDs.driver: com.mysql.jdbc.Driver
org.quartz.dataSource.ocpQzDs.user: test
org.quartz.dataSource.ocpQzDs.password: test
org.quartz.dataSource.ocpQzDs.maxConnection: 30
# Configure Plugins
org.quartz.plugin.shutdownHook.class: org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownHook.cleanShutdown: true
#org.quartz.plugin.triggHistory.class: org.quartz.plugins.history.LoggingJobHistoryPlugin
View Code



以上就是完美秋天为你收集整理的Quartz使用(6) - Quartz项目实战的全部内容,希望文章能够帮你解决Quartz使用(6) - Quartz项目实战所遇到的程序开发问题。



评论列表共有 0 条评论
