概述
本片博文将阐述项目工作中使用Quartz的情况,包含项目背景、项目框架、Quartz集群部署等方面,重点讲述如何在实际项目中使用Quartz。
1. 背景
因项目需求,需要定时调用数据下载接口,并将数据存储至诸如mongo、redis、elasticsearch等数据库或缓存中。具体涉及到的需求如下:
a. 调用接口的任务均从mongo数据库读取;
b. 任务的个数随着业务量的增加而增加;
c. 每个调用任务的定时执行时间可能不同,且定时执行时间在mongo中可配置;
d. 任务的执行需要动态更新,如检测到某一任务的定时时间发生变化,则任务的执行也需要实时修改
e. mongo、redis、elasticsearch等数据库中所存储的字段也由mongo进行配置;
f. 任务执行需要实时性较高、可靠性较强、可扩展性较高等
综上需求,调研了一番,发现任务调度框架Quartz可满足项目需求。
2. 框架
基于项目的需求,结合任务调度框架Quartz,大体的流程框架如下图所示:
1) 首先构建从mongo加载任务
2) 将任务的配置信息初始化至Quartz
3) 通过Quartz的Job任务实现定时调用下载接口任务
4) 将下载的数据依据配置,存储至数据库中
5) 定时检测任务通过定时扫描mongo数据库,查看相关任务信息的配置是否发生变化,如果发生变化,则进行动态更新
6) 为了实现高可用性、可扩展性,可以直接使用Quartz原生的集群特性。
3. 核心代码
核心代码将会涵盖上述流程图中的相关环节,为了项目的保密性,相关信息也会隐藏。
3.1 任务主流程
import com.quartz.conf.Configuration; import com.quartz.conf.OcpConfHelper; import com.quartz.module.TaskInfo; import org.apache.log4j.PropertyConfigurator; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; public class SchedulerRunner { static Logger logger = LoggerFactory.getLogger(SchedulerRunner.class); public static void main(String[] args) { // 加载日志配置文件 PropertyConfigurator.configure("./conf/log4j.properties"); // 加载quartz配置文件 System.setProperty("org.quartz.properties", "./conf/quartz.properties"); // 执行任务解析与调度 run(); } public static void run(){ // 获取配置信息表 List<TaskInfo> taskInfos = GenerateTaskInfo.generateTaskInfoFromMysql(); if(taskInfos.size() == 0){ logger.info("there is no tasks from mongoInfo"); return; } // 过滤下线任务 taskInfos = GenerateTaskInfo.filterTask(taskInfos); if(taskInfos.size() == 0){ logger.info("all tasks if offline, no need to run"); return; } Scheduler scheduler = null; try { scheduler = StdSchedulerFactory.getDefaultScheduler(); } catch (SchedulerException e) { e.printStackTrace(); } if(scheduler == null){ logger.error("create scheduler failed"); return; } if(isSchedulerClear()){ clearSchedulerJob(scheduler); } // 加入任务调度 for(TaskInfo task : taskInfos){ SchedulerFactory.addJob2Scheduler(task, scheduler); } // 加入动态更新任务 SchedulerFactory.addDynamicUpdateJob2Scheduler(scheduler); // 开启任务 try { scheduler.start(); } catch (SchedulerException e) { logger.error("start scheduler error!"); } } public static void clearSchedulerJob(Scheduler scheduler){ try { scheduler.clear(); } catch (SchedulerException e) { logger.error("clear scheduler error!"); } } /** * 基于配置文件中的信息,加载调度器开始运行时的清洗标识 * @return */ private static boolean isSchedulerClear(){ Configuration conf = OcpConfHelper.getInstance().getOcpConf(); return conf.getBooleanValue("cleanSchedulerFlag", "true"); } }
3.2 封装任务对象
import java.util.List; import java.util.Map; public class TaskInfo { protected String categoryId; // 业务Id protected String categoryName; // 业务名称 protected String sourceId; // 信源Id protected String sourceName; // 信源名称 protected int sourceStatus; // 信源状态 protected String pipelineConf; // 信源pipeline配置信息 protected List<String> dbStoreTypes; // 业务的存储类型 protected String esConfInfo; // ES存储配置 protected String dbConfInfo; // DB存储配置 protected String cronInfo; // 定时任务信息 protected int sourceType; // 实时更新还是离线更新 protected List<String> indexBuildEles; // 更新索引的信息 protected List<String> idBuildEles; // id的构建因素 protected String indexType; // 全量或增量 protected String categoryLevel1; // 一级分类 protected String zhName; // 中文信息 protected Map<String,String> outputType; //输出参数名及其类型 protected String providerName; protected String functionName; //category_function名称 public String getProviderName() { return providerName; } public void setProviderName(String providerName) { this.providerName = providerName; } public String getCategoryId() { return categoryId; } public void setCategoryId(String categoryId) { this.categoryId = categoryId; } public String getCategoryName() { return categoryName; } public void setCategoryName(String categoryName) { this.categoryName = categoryName; } public String getSourceId() { return sourceId; } public void setSourceId(String sourceId) { this.sourceId = sourceId; } public String getSourceName() { return sourceName; } public void setSourceName(String sourceName) { this.sourceName = sourceName; } public int getSourceStatus() { return sourceStatus; } public void setSourceStatus(int sourceStatus) { this.sourceStatus = sourceStatus; } public String getPipelineConf() { return pipelineConf; } public void setPipelineConf(String pipelineConf) { this.pipelineConf = pipelineConf; } public String getEsConfInfo() { return esConfInfo; } public void setEsConfInfo(String esConfInfo) { this.esConfInfo = esConfInfo; } public String getDbConfInfo() { return dbConfInfo; } public void setDbConfInfo(String dbConfInfo) { this.dbConfInfo = dbConfInfo; } public String getCronInfo() { return cronInfo; } public void setCronInfo(String cronInfo) { this.cronInfo = cronInfo; } public int getSourceType() { return sourceType; } public void setSourceType(int sourceType) { this.sourceType = sourceType; } public List<String> getIdBuildEles() { return idBuildEles; } public void setIdBuildEles(List<String> idBuildEles) { this.idBuildEles = idBuildEles; } public List<String> getIndexBuildEles() { return indexBuildEles; } public void setIndexBuildEles(List<String> indexBuildEles) { this.indexBuildEles = indexBuildEles; } public String getIndexType() { return indexType; } public void setIndexType(String indexType) { this.indexType = indexType; } public String getCategoryLevel1() { return categoryLevel1; } public void setCategoryLevel1(String categoryLevel1) { this.categoryLevel1 = categoryLevel1; } public String getZhName() { return zhName; } public void setZhName(String zhName) { this.zhName = zhName; } public TaskInfo(){} public List<String> getDbStoreTypes() { return dbStoreTypes; } public void setDbStoreTypes(List<String> dbStoreTypes) { this.dbStoreTypes = dbStoreTypes; } public Map<String, String> getOutputType() { return outputType; } public void setOutputType(Map<String, String> outputType) { this.outputType = outputType; } public String getFunctionName() { return functionName; } public void setFunctionName(String functionName) { this.functionName = functionName; } /** * 是否有相同的定时信息 * @param taskInfo * @return */ public boolean hasSameCronInfo(TaskInfo taskInfo){ if(taskInfo == null) return false; return this.getCronInfo().equalsIgnoreCase(taskInfo.getCronInfo()); } @Override public String toString() { return "TaskInfo{" + "categoryId='" + categoryId + ''' + ", categoryName='" + categoryName + ''' + ", sourceId='" + sourceId + ''' + ", sourceName='" + sourceName + ''' + ", sourceStatus=" + sourceStatus + ", pipelineConf='" + pipelineConf + ''' + ", dbStoreTypes=" + dbStoreTypes + ", esConfInfo='" + esConfInfo + ''' + ", dbConfInfo='" + dbConfInfo + ''' + ", cronInfo='" + cronInfo + ''' + ", sourceType=" + sourceType + ", indexBuildEles=" + indexBuildEles + ", idBuildEles=" + idBuildEles + ", indexType='" + indexType + ''' + ", categoryLevel1='" + categoryLevel1 + ''' + ", zhName='" + zhName + ''' + ", outputType='" + outputType + ''' + ", providerName='" + providerName + ''' + ", functionName='" + functionName + ''' + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TaskInfo taskInfo = (TaskInfo) o; if (sourceStatus != taskInfo.sourceStatus) return false; if (sourceType != taskInfo.sourceType) return false; if (categoryName != null ? !categoryName.equals(taskInfo.categoryName) : taskInfo.categoryName != null) return false; if (sourceName != null ? !sourceName.equals(taskInfo.sourceName) : taskInfo.sourceName != null) return false; if (providerName != null ? !providerName.equals(taskInfo.providerName) : taskInfo.providerName != null) return false; if (pipelineConf != null ? !pipelineConf.equals(taskInfo.pipelineConf) : taskInfo.pipelineConf != null) return false; if (dbStoreTypes != null ? !dbStoreTypes.equals(taskInfo.dbStoreTypes) : taskInfo.dbStoreTypes != null) return false; if (esConfInfo != null ? !esConfInfo.equals(taskInfo.esConfInfo) : taskInfo.esConfInfo != null) return false; if (dbConfInfo != null ? !dbConfInfo.equals(taskInfo.dbConfInfo) : taskInfo.dbConfInfo != null) return false; if (cronInfo != null ? !cronInfo.equals(taskInfo.cronInfo) : taskInfo.cronInfo != null) return false; if (indexBuildEles != null ? !indexBuildEles.equals(taskInfo.indexBuildEles) : taskInfo.indexBuildEles != null) return false; if (idBuildEles != null ? !idBuildEles.equals(taskInfo.idBuildEles) : taskInfo.idBuildEles != null) return false; if (indexType != null ? !indexType.equals(taskInfo.indexType) : taskInfo.indexType != null) return false; if (categoryLevel1 != null ? !categoryLevel1.equals(taskInfo.categoryLevel1) : taskInfo.categoryLevel1 != null) return false; if (outputType != null ? !outputType.equals(taskInfo.outputType) : taskInfo.outputType != null) return false; if (functionName != null ? !functionName.equals(taskInfo.functionName) : taskInfo.functionName != null) return false; return zhName != null ? zhName.equals(taskInfo.zhName) : taskInfo.zhName == null; } @Override public int hashCode() { int result = categoryName != null ? categoryName.hashCode() : 0; result = 31 * result + (sourceName != null ? sourceName.hashCode() : 0); result = 31 * result + (providerName != null ? providerName.hashCode() : 0); result = 31 * result + sourceStatus; result = 31 * result + (pipelineConf != null ? pipelineConf.hashCode() : 0); result = 31 * result + (dbStoreTypes != null ? dbStoreTypes.hashCode() : 0); result = 31 * result + (esConfInfo != null ? esConfInfo.hashCode() : 0); result = 31 * result + (dbConfInfo != null ? dbConfInfo.hashCode() : 0); result = 31 * result + (cronInfo != null ? cronInfo.hashCode() : 0); result = 31 * result + sourceType; result = 31 * result + (indexBuildEles != null ? indexBuildEles.hashCode() : 0); result = 31 * result + (idBuildEles != null ? idBuildEles.hashCode() : 0); result = 31 * result + (indexType != null ? indexType.hashCode() : 0); result = 31 * result + (categoryLevel1 != null ? categoryLevel1.hashCode() : 0); result = 31 * result + (zhName != null ? zhName.hashCode() : 0); result = 31 * result + (outputType != null ? outputType.hashCode() : 0); result = 31 * result + (functionName != null ? functionName.hashCode() : 0); return result; } }
3.3 任务的构造及初始化
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.Gson; import com.quartz.consts.SourceType; import com.quartz.consts.Sql; import com.quartz.consts.StatusType; import com.quartz.module.TaskInfo; import com.quartz.util.MongoUtil; import com.quartz.util.MySqlUtil; import com.quartz.util.TimeUtil; import com.mongodb.BasicDBObject; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.DBObject; import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.util.*; /** * 获取调度任务的相关信息 * Created by songwang4 on 2017/6/7. */ public class GenerateTaskInfo { static Logger logger = LoggerFactory.getLogger(GenerateTaskInfo.class); static DBCollection sourceColl = MongoUtil.createOcpSourceDB(); static DBCollection categoryColl = MongoUtil.createOcpCategoryDB(); /** * 从数据库中读取任务相关信息 * * @return */ public static List<TaskInfo> generateTaskInfoFromMongo() { // 将任务信息进行封装 List<TaskInfo> tasks = Lists.newArrayList(); TaskInfo task = null; DBCursor sourceCur = sourceColl.find(); DBObject sourceObj = null; DBObject categoryObj = null; while (sourceCur.hasNext()) { sourceObj = sourceCur.next(); task = new TaskInfo(); String sourceName = sourceObj.get("sourceName").toString(); String categoryName = sourceObj.get("category").toString(); // 基于业务名查找对应的业务表信息 categoryObj = categoryColl.findOne(new BasicDBObject("catName", categoryName)); if (categoryObj == null) { logger.error("no category found through source: " + sourceName); continue; } task.setCategoryId(categoryObj.get("_id").toString()); // 业务Id task.setCategoryName(categoryName); // 业务名 List<String> dbStoreTypes = Lists.newArrayList(); if (categoryObj.containsField("storeType")) { try { JSONArray storeTypeArr = JSON.parseArray(categoryObj.get("storeType").toString()); for (int i = 0; i < storeTypeArr.size(); i++) { dbStoreTypes.add(storeTypeArr.getString(i)); } } catch (Exception e) { } } task.setDbStoreTypes(dbStoreTypes); // 存储类型 task.setCategoryLevel1(categoryObj.get("parent").toString()); // 一级业务分类 task.setZhName(sourceObj.get("zhName").toString()); task.setDbConfInfo(categoryObj.containsField("db") ? categoryObj.get("db").toString() : categoryName); // DB配置 task.setEsConfInfo(categoryObj.containsField("es") ? categoryObj.get("es").toString() : categoryName); // ES配置 task.setIndexBuildEles(extractBuilderEles(categoryObj, "isIndex", "itemName")); // 构建ES索引信息 task.setIdBuildEles(extractBuilderEles(categoryObj, "isGK", "itemName")); // 构建id的信息元素 task.setSourceId(sourceObj.get("_id").toString()); // 信源Id task.setSourceName(sourceName); // 信源名称 int status = StatusType.OFFLINE; if (sourceObj.containsField("status")) { String statusType = sourceObj.get("status").toString(); if (statusType.equals(StatusType.STR_ONLINE)) { status = StatusType.ONLINE; } } task.setSourceStatus(status); // 信源的上下线状态 int sourceType = SourceType.REAL_TIME_PROCESS; if (sourceObj.containsField("type")) { String strStatusType = sourceObj.get("type").toString(); if (strStatusType.equals(SourceType.STR_OFF_LINE_PROCESS)) { sourceType = SourceType.OFF_LINE_PROCESS; } } task.setSourceType(sourceType); // 离线或实时处理 task.setIndexType(sourceObj.containsField("indexType") ? sourceObj.get("indexType").toString() : ""); // 增量或全量标识 // 定时时间配置 task.setCronInfo(sourceObj.containsField("timerInfo") ? sourceObj.get("timerInfo").toString() : ""); if (task.getCronInfo().trim().length() == 0) { task.setCronInfo(generateCronInfo(sourceObj)); } task.setPipelineConf(sourceObj.containsField("mappingWorkflow") ? sourceObj.get("mappingWorkflow").toString() : ""); // pipeline配置信息 tasks.add(task); } sourceCur.close(); return tasks; } /** * 构建生成id或es的信息元素 * * @param categoryObj * @param queryField * @param retureField * @return */ public static List<String> extractBuilderEles(DBObject categoryObj, String queryField, String retureField) { List<String> builerEles = Lists.newArrayList(); JSONArray dataItemArr = null; try { dataItemArr = JSON.parseArray(categoryObj.get("dataItems").toString()); } catch (JSONException e) { } if (dataItemArr != null && dataItemArr.size() > 0) { JSONObject dataItemJson = null; for (int i = 0; i < dataItemArr.size(); i++) { dataItemJson = dataItemArr.getJSONObject(i); if (dataItemJson.containsKey(queryField) && dataItemJson.getBoolean(queryField)) { builerEles.add(dataItemJson.getString(retureField).trim()); } } } return builerEles; } /** * 基于业务表中的信息构造定时任务表达式 * * @param sourceObj * @return */ public static String generateCronInfo(DBObject sourceObj) { String updateTimeType = ""; String updateTimeCycle = ""; if (sourceObj.containsField("updateType")) { updateTimeType = sourceObj.get("updateType").toString(); } if (sourceObj.containsField("updateCycle")) { updateTimeCycle = sourceObj.get("updateCycle").toString(); } if (updateTimeType.trim().length() == 0 || updateTimeCycle.trim().length() == 0) { return ""; } StringBuilder sb = new StringBuilder(); Date date = null; if (updateTimeType.equalsIgnoreCase("YEAR")) { date = TimeUtil.parseDate(updateTimeCycle, "MM-dd HH:mm"); if (date == null) { try { sb.append(TimeUtil.extractFixedTimeByDay(Integer.parseInt(updateTimeCycle), 0, 0)); } catch (NumberFormatException e) { } } else { sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.MONTH) + 1).append(" ? *"); } } if (updateTimeType.equalsIgnoreCase("MONTH")) { date = TimeUtil.parseDate(updateTimeCycle, "dd HH:mm"); if (date == null) return ""; sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" * ?"); } if (updateTimeType.equalsIgnoreCase("DAY")) { date = TimeUtil.parseDate(updateTimeCycle, "HH:mm"); if (date == null) return ""; sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" * * ?"); } if (updateTimeType.equalsIgnoreCase("WEEK")) { String weekDay = "1"; if (sourceObj.containsField("weekDay")) { weekDay = sourceObj.get("weekDay").toString(); } date = TimeUtil.parseDate(updateTimeCycle, "HH:mm"); if (date == null) return ""; sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ") .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ? * ") .append(TimeUtil.extractFixedTime(weekDay)); } if (updateTimeType.equalsIgnoreCase("HOUR")) { try { int hour = Integer.parseInt(updateTimeCycle); sb.append(TimeUtil.extractFixedTimeByHour(hour, 0)); } catch (NumberFormatException e) { } } if (updateTimeType.equalsIgnoreCase("MINUTE")) { try { int minute = Integer.parseInt(updateTimeCycle); sb.append(TimeUtil.extractFixedTimeByMinute(minute)); } catch (NumberFormatException e) { } } if (updateTimeType.equalsIgnoreCase("SECOND")) { sb.append("*/").append(updateTimeCycle).append(" * * * * ?"); } return sb.toString(); } /** * 过滤下线的任务 * * @param tasks * @return */ public static List<TaskInfo> filterTask(List<TaskInfo> tasks) { List<TaskInfo> taskInfos = Lists.newArrayList(); for (TaskInfo taskInfo : tasks) { // 过滤下线的信源状态或实时的信源 if (taskInfo.getSourceStatus() == StatusType.OFFLINE || taskInfo.getSourceType() != SourceType.OFF_LINE_PROCESS) { continue; } taskInfos.add(taskInfo); } return taskInfos; } /** * 基于业务名称对任务进行分组 * * @param oriTasks * @return */ public static Map<String, List<TaskInfo>> groupTaskByCategory(List<TaskInfo> oriTasks) { Map<String, List<TaskInfo>> categoryTasks = Maps.newHashMap(); for (TaskInfo oriTask : oriTasks) { if (!categoryTasks.containsKey(oriTask.getCategoryId())) { List<TaskInfo> taskInfos = Lists.newArrayList(); taskInfos.add(oriTask); categoryTasks.put(oriTask.getCategoryId(), taskInfos); } else { boolean hasSameSourceId = false; for (TaskInfo taskInfo : categoryTasks.get(oriTask.getCategoryId())) { if (taskInfo.getSourceId().equals(oriTask.getSourceId())) { hasSameSourceId = true; break; } } if (!hasSameSourceId) { categoryTasks.get(oriTask.getCategoryId()).add(oriTask); } } } return categoryTasks; } }
3.4 调用下载接口的任务
import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.pipes.entity.CrawlerLog; import com.pipes.out.IDataOut; import com.pipes.parser.PipeExecuter; import com.quartz.consts.SourceType; import com.quartz.consts.StatusType; import com.quartz.consts.StoreType; import com.quartz.module.TaskInfo; import com.quartz.output.*; import com.quartz.util.CrawlerLogUtil; import org.apache.log4j.PropertyConfigurator; import org.elasticsearch.common.Strings; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; /** * 离线存储任务 * 注意:上一个任务如未完成,且下一次的定时任务已到执行时间,则需要等待上一个任务 * 执行完成,再进行下一个任务 */ @DisallowConcurrentExecution public class ScheduleJob implements Job { static Logger logger = LoggerFactory.getLogger(ScheduleJob.class); public ScheduleJob() { } @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { JobDetail jobDetail = jobExecutionContext.getJobDetail(); JSONObject json = new JSONObject(); json.put("jobName", jobDetail.getKey().getName()); json.put("jobGroup", jobDetail.getKey().getGroup()); json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName()); json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup()); logger.info("job is running: " + json.toString()); JobDataMap dataMap = jobDetail.getJobDataMap(); JSONObject confJson = null; try { confJson = JSONObject.parseObject(dataMap.getString(SchedulerFactory.CONF_INFO)); } catch (JSONException e) { } if (confJson == null) { logger.error("conf is empty: " + json.toString()); return; } // 获取存储类型 TaskInfo taskInfo = new Gson().fromJson(confJson.toString(), TaskInfo.class); if (!isNeedtoRun(taskInfo)) { logger.info("no need to run: " + json.toString()); return; } List<IDataOut> dataOuts = Lists.newArrayList(); for (String dbStoreType : taskInfo.getDbStoreTypes()) { switch (dbStoreType) { case StoreType.STR_MONGO_STORE: dataOuts.add(new DataOut2Mongo(taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName())); break; case StoreType.STR_ES_STORE: dataOuts.add(new DataOut2ES(taskInfo.getCategoryName(),taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getIndexBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName())); break; case StoreType.STR_REDIS_STORE: dataOuts.add(new DataOut2Redis(taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName())); break; } } // 创建数据拉取对象,拉取前存储一次,拉取后存储一次 CrawlerLog crawlerLog = createCrawlerLog(taskInfo); if (dataOuts.size() > 0) { PipeExecuter.executeSave(taskInfo.getPipelineConf(), dataOuts, crawlerLog); } } /** * 判断job是否需要执行 * * @param taskInfo * @return */ public static boolean isNeedtoRun(TaskInfo taskInfo) { // 实时or离线 if (taskInfo.getSourceType() == SourceType.REAL_TIME_PROCESS) { logger.warn("the job is real-time process, no need to run"); return false; } // job的上下线状态 if (taskInfo.getSourceStatus() == StatusType.OFFLINE) { logger.warn("the job status is offline, no need to run"); return false; } // pipeline的配置信息 if (Strings.isNullOrEmpty(taskInfo.getPipelineConf()) || taskInfo.getPipelineConf().trim().length() == 0) { logger.warn("no pipeline configure info, no need to run"); return false; } // job的存储信息 if (taskInfo.getDbStoreTypes().size() == 0) { logger.warn("the job store type is 0, no need to store"); return false; } return true; } /** * 创建拉取数据的日志,以便管理系统查看 * * @param taskInfo * @return */ public CrawlerLog createCrawlerLog(TaskInfo taskInfo) { CrawlerLog crawlerLog = new CrawlerLog(); crawlerLog.setIndexType(taskInfo.getIndexType()); // 增量还是全量 crawlerLog.setCategoryLv1(taskInfo.getCategoryLevel1()); String sourceName = taskInfo.getSourceName(); String sourceZhName = taskInfo.getZhName(); String sourceArr[] = sourceName.split("_"); String sourceZhArr[] = sourceZhName.split("_"); crawlerLog.setCategoryLv2((sourceArr != null && sourceArr.length > 0) ? sourceArr[0] : ""); crawlerLog.setFunctionName((sourceArr != null && sourceArr.length > 1) ? sourceArr[1] : ""); crawlerLog.setProviderName((sourceArr != null && sourceArr.length > 2) ? sourceArr[2] : ""); crawlerLog.setFunctionZhName((sourceZhArr != null && sourceZhArr.length > 1) ? sourceZhArr[1] : ""); crawlerLog.setProviderZhName((sourceZhArr != null && sourceZhArr.length > 2) ? sourceZhArr[2] : ""); crawlerLog.setId(); return crawlerLog; } }
3.5 任务调度工厂
工厂用于生成任务的触发器Trigger,以及创建任务Job。
import com.google.gson.Gson; import com.quartz.consts.PrefixType; import com.quartz.module.TaskInfo; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SchedulerFactory { static Logger logger = LoggerFactory.getLogger(SchedulerFactory.class); public static final String CONF_INFO = "conf_info"; public static final String DYNAMIC_UPDATE_JOB_NAME = "dynamicUpdateJob"; public static final String DYNAMIC_UPDATE_GROUP_NAME = "dynamicUpdateGroup"; public static final String DYNAMIC_UPDATE_CRONTINFO = "*/30 * * * * ?"; /** * 将任务加入任务调度中 * @param taskInfo * @param scheduler */ public static void addJob2Scheduler(TaskInfo taskInfo, Scheduler scheduler) { try { JobDetail jobDetail = generateJobDetail(taskInfo); if(jobDetail == null){ logger.error("create job failed!"); return; } Trigger triger = generateTrigger(taskInfo); if(triger == null){ logger.error("create trigger failed!"); return; } // 加载执行Job及定时器 scheduler.scheduleJob(jobDetail,triger); } catch (SchedulerException e) { logger.error("create scheduler error, error message: "+e.toString()); } } public static void addDynamicUpdateJob2Scheduler(Scheduler scheduler) { try { JobDetail jobDetail = generateDynamicUpdateJobDetail(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME); if(jobDetail == null){ logger.error("create job failed!"); return; } Trigger triger = generateTrigger(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME, DYNAMIC_UPDATE_CRONTINFO); if(triger == null){ logger.error("create trigger failed!"); return; } // 加载执行Job及定时器 scheduler.scheduleJob(jobDetail,triger); } catch (SchedulerException e) { logger.error("create scheduler error, error message: "+e.toString()); } } /** * 于信源信息生成对应的job * @param taskInfo * @return */ public static JobDetail generateJobDetail(TaskInfo taskInfo) { String jobName = taskInfo.getSourceName(); if(jobName.trim().length() == 0){ logger.error("job name is empty, please check!"); return null; } String jobGroup = taskInfo.getCategoryName(); if(jobGroup.trim().length() == 0){ logger.error("job group is empty, please check!"); return null; } return JobBuilder.newJob(ScheduleJob.class) .withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup) .requestRecovery() .usingJobData(CONF_INFO, new Gson().toJson(taskInfo)).build(); } public static JobDetail generateDynamicUpdateJobDetail(String jobName, String jobGroup) { if(jobName.trim().length() == 0){ logger.error("job name is empty, please check!"); return null; } if(jobGroup.trim().length() == 0){ logger.error("job group is empty, please check!"); return null; } return JobBuilder.newJob(DynamicUpdateJob.class) .withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup) .requestRecovery() .build(); } /** * 基于信源信息生成对应的trigger * @param taskInfo * @return */ public static Trigger generateTrigger(TaskInfo taskInfo) { String sourceTriggerName = taskInfo.getSourceName(); if(sourceTriggerName.trim().length() == 0){ logger.error("trigger name is empty, please check!"); return null; } String sourceTriggerGroup = taskInfo.getCategoryName(); if(sourceTriggerGroup.trim().length() == 0){ logger.error("trigger group is empty, please check!"); return null; } String cronInfo = taskInfo.getCronInfo(); if(cronInfo.trim().length() == 0){ logger.error("cron timer info is empty, please check!"); return null; } return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName, PrefixType.TRIGGER_PREFIX+sourceTriggerGroup) .withSchedule(CronScheduleBuilder.cronSchedule(cronInfo)) .build(); } public static Trigger generateTrigger(String sourceTriggerName, String sourceTriggerGroup, String cronInfo) { if(sourceTriggerName.trim().length() == 0){ logger.error("trigger name is empty, please check!"); return null; } if(sourceTriggerGroup.trim().length() == 0){ logger.error("trigger group is empty, please check!"); return null; } if(cronInfo.trim().length() == 0){ logger.error("cron timer info is empty, please check!"); return null; } return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName, PrefixType.TRIGGER_PREFIX+sourceTriggerGroup) .withSchedule(CronScheduleBuilder.cronSchedule(cronInfo)) .build(); } }
3.6 动态检测任务更新的Job
import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.google.gson.Gson; import com.quartz.consts.PrefixType; import com.quartz.module.TaskInfo; import org.quartz.*; import org.quartz.impl.matchers.GroupMatcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; @DisallowConcurrentExecution public class DynamicUpdateJob implements Job{ private static Logger logger = LoggerFactory.getLogger(DynamicUpdateJob.class); public DynamicUpdateJob(){} @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { JobDetail jobDetail = jobExecutionContext.getJobDetail(); JSONObject json = new JSONObject(); json.put("jobName", jobDetail.getKey().getName()); json.put("jobGroup", jobDetail.getKey().getGroup()); json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName()); json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup()); logger.info("job is running: "+json.toString()); // 获取当前的调度器 Scheduler scheduler = jobExecutionContext.getScheduler(); // 获取配置信息中的任务(注意需要保持) List<TaskInfo> confTaskInfos = GenerateTaskInfo.generateTaskInfoFromMysql(); // 获取所有的job信息 List<JobKey> schedulerJobKeys = acquireJobKeysWithinSceduler(scheduler); // 1. 配置任务不存在,而sheduler相关任务存在,则进行下线处理 for(JobKey schedulerJobKey : schedulerJobKeys ){ boolean hasSameJobKeyInConfTask = false; for(TaskInfo confTaskInfo : confTaskInfos){ if(generateJobKey(confTaskInfo).equals(schedulerJobKey)){ hasSameJobKeyInConfTask = true; break; } } if(!hasSameJobKeyInConfTask){ try { scheduler.deleteJob(schedulerJobKey); logger.info("delete offline job: "+schedulerJobKey.toString()); } catch (SchedulerException e) { logger.error("delete offline job error: "+json.toString()); } } } // 2 配置任务与调度器任务比较 for(TaskInfo confTaskInfo : confTaskInfos){ JobKey confJobKey = generateJobKey(confTaskInfo); boolean hasSameJob = false; for(JobKey schedulerJobKey : schedulerJobKeys ){ if(confJobKey.equals(schedulerJobKey)){ hasSameJob = true; break; } } if(hasSameJob){ //具有相同名称的job logger.info("has same jobKey: "+confJobKey); JobDetail schedulerJobDetail = null; try { schedulerJobDetail = scheduler.getJobDetail(confJobKey); } catch (SchedulerException e) { logger.error("get job detail from scheduler error: "+confJobKey); } if(schedulerJobDetail == null) continue; // 1) 是否需要下线 if(!ScheduleJob.isNeedtoRun(confTaskInfo)){ try { logger.info("has same jobKey and offline the job "+confJobKey); scheduler.deleteJob(confJobKey); } catch (SchedulerException e) { logger.error("delete offline job error: "+confJobKey); } }else{ // 2) 是否需要更新任务 TaskInfo schedulerTaskInfo = parseTaskInfoFromJobDataMap(schedulerJobDetail); logger.info("confTaskInfo: " + confTaskInfo); logger.info("schedulerTaskInfo: " + schedulerTaskInfo); if(!confTaskInfo.equals(schedulerTaskInfo)){ try { logger.info("has same jobKey and update the job "+confJobKey); scheduler.deleteJob(confJobKey); SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler); } catch (SchedulerException e) { logger.error("update scheduler info error: "+confJobKey); } }else{ logger.info("the job info is same "+confJobKey); } } }else{ // 创建新的Job // 1) 是否满足上线的条件 if(!ScheduleJob.isNeedtoRun(confTaskInfo)){ logger.info("the status is offline, no need to create new job: "+confJobKey); continue; } logger.info("no same jobKey and create job "+confJobKey); // 2) 上线 SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler); } } } protected List<JobKey> acquireJobKeysWithinSceduler(Scheduler scheduler){ List<JobKey> jobKeys = Lists.newArrayList(); try { for(String groupName : scheduler.getJobGroupNames()){ if(groupName.equals(PrefixType.JOB_PREFIX+SchedulerFactory.DYNAMIC_UPDATE_GROUP_NAME)){ continue; } for(JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){ jobKeys.add(jobKey); } } } catch (SchedulerException e) { } return jobKeys; } protected TaskInfo parseTaskInfoFromJobDataMap(JobDetail jobDetail){ try { String confInfo = jobDetail.getJobDataMap().getString(SchedulerFactory.CONF_INFO); return new Gson().fromJson(confInfo, TaskInfo.class); } catch (Exception e) { logger.error("parse task info from JobDataMap error!"); return null; } } protected JobKey generateJobKey(TaskInfo taskInfo){ return generateJobKey(taskInfo.getSourceName(), taskInfo.getCategoryName()); } protected JobKey generateJobKey(String jobName, String jobGroup){ return JobKey.jobKey(PrefixType.JOB_PREFIX+jobName,PrefixType.JOB_PREFIX+jobGroup); } }
3.7 Es数据库存储
import com.alibaba.fastjson.JSONObject; import com.pipes.out.IDataOut; import com.quartz.conf.Configuration; import com.quartz.conf.OcpConfHelper; import com.quartz.util.IdBuilder; import com.quartz.util.OutputTypeTransform; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; import java.util.Map; /** * Created by songwang4 on 2017/6/7. */ public class DataOut2ES implements IDataOut, IDataClose { static Logger logger = LoggerFactory.getLogger(DataOut2ES.class); static TransportClient client; String indexName; // 默认为ocp String typeName; String sourceName; List<String> indexBuildEles; List<String> idBuilderEles; Map<String,String> outputType; String providerName; public DataOut2ES(String indexName,String type){ this.indexName = indexName; this.typeName = type; init(); } public DataOut2ES(String indexName,String type, List<String> indexBuildEles){ this(indexName,type); this.indexBuildEles = indexBuildEles; } public DataOut2ES(String indexName,String type, List<String> idBuilderEles, List<String> indexBuildEles){ this(indexName,type, indexBuildEles); this.idBuilderEles = idBuilderEles; } public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles){ this(indexName,type, idBuilderEles, indexBuildEles); this.sourceName = sourceName; } public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles, Map<String,String> outputType,String providerName){ this(indexName,type,sourceName, idBuilderEles, indexBuildEles); this.outputType = outputType; this.providerName = providerName; } public static void init() { if(client == null){ Configuration conf = OcpConfHelper.getInstance().getOcpConf(); String esClusterName = conf.getStringValue("ocp_es_cluster_name", ""); String esIp = conf.getStringValue("ocp_es_ip", ""); int esPort = conf.getIntValue("ocp_es_port", ""); Settings settings = Settings.builder() .put("cluster.name",esClusterName) .put("client.transport.sniff", true) .put("client.transport.ping_timeout", "120s") .put("client.transport.nodes_sampler_interval","30s").build(); try { client = new PreBuiltTransportClient(settings) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esIp),esPort)); } catch (UnknownHostException e) { e.printStackTrace(); } } } /** * 批量写入 * @param datas */ public void save(List<JSONObject> datas) { // 批量的插入数据 BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject data : datas){ //按输出字段类型进行转换 // data = OutputTypeTransform.transform(data,outputType); String id64 = IdBuilder.generateId(this.sourceName, data, this.idBuilderEles); if(id64.trim().length() == 0) continue; JSONObject indexJson = new JSONObject(); for(String indexBuildEle : this.indexBuildEles){ if(data.containsKey(indexBuildEle)){ indexJson.put(indexBuildEle, data.get(indexBuildEle)); } } if(indexJson.keySet().isEmpty()){ logger.info("no json fields, so no need to save"); return; } bulkRequest.add(client.prepareIndex(indexName, typeName, id64).setSource(indexJson.toString())); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if(bulkResponse.hasFailures()){ logger.error("insert data 2 es error "+indexName); System.out.println(bulkResponse.buildFailureMessage()); } } public void saveWithoutIndexBuilds(List<JSONObject> datas) { // 批量的插入数据 BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject data : datas){ bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(data.toString())); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if(bulkResponse.hasFailures()){ logger.error("insert data 2 es error "+indexName); System.out.println(bulkResponse.buildFailureMessage()); } } public void saveWithoutIndexBuilds2(List<JSONObject> datas) { // 批量的插入数据 BulkRequestBuilder bulkRequest = client.prepareBulk(); for(JSONObject data : datas){ String _id = data.getString("_id"); JSONObject source = data.getJSONObject("_source"); bulkRequest.add(client.prepareIndex(indexName, typeName,_id).setSource(source.toString())); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if(bulkResponse.hasFailures()){ logger.error("insert data 2 es error "+indexName); System.out.println(bulkResponse.buildFailureMessage()); } } /** * 判断索引是否存在 * @param indexName * @return */ public boolean isExistsIndex(String indexName){ IndicesExistsResponse response = client.admin().indices() .exists(new IndicesExistsRequest().indices(new String[]{indexName})).actionGet(); return response.isExists(); } /** * 创建索引信息 * @param indexName * @return */ public boolean createIndex(String indexName){ try { CreateIndexResponse indexResponse = this.client .admin() .indices() .prepareCreate(indexName) .get(); return indexResponse.isAcknowledged(); } catch (ElasticsearchException e) { e.printStackTrace(); } return false; } @Override public void save(Object data) { if(this.indexBuildEles.size() == 0){ logger.error("index fields are empty in es, no index need to save, info: " + data.toString()); return; } // 逐条插入数据 JSONObject json = null; try { json = (JSONObject)data; } catch (Exception e) { logger.error("trans data to json error in es :" + data.toString()); return; } if(json == null){ logger.error("trans data to Json error in es, info " + data.toString()); return; } // json = OutputTypeTransform.transform(json,outputType); // 构建索引id String id64 = IdBuilder.generateId(this.sourceName, json, this.idBuilderEles); if(id64.trim().length() == 0){ logger.error("generate 64 bit id is null,please check: " + data.toString()); return; } JSONObject indexJson = new JSONObject(); for(String indexBuildEle : this.indexBuildEles){ if(json.containsKey(indexBuildEle)){ indexJson.put(indexBuildEle, json.get(indexBuildEle)); } } if(indexJson.keySet().isEmpty()){ logger.info("no json fields, so no need to save"); return; } logger.info("index info: "+indexJson); IndexResponse response = client.prepareIndex(this.indexName, this.typeName, id64).setSource(indexJson.toString()).get(); if(response.status() != RestStatus.CREATED && response.status() != RestStatus.OK){ logger.error("index error in es, status is "+response.status().getStatus()+"info: " + data.toString()); return; } } @Override public void close() { } }
以上代码均为与Quartz相关的整体流程,虽然各个细节方面的代码,如配置类,数据库初始化类或加载类、以及部分帮助类没有展示,但对于Quartz的核心使用,已略窥一二。如有问题,可留言回复。
4. 集群模式
注意:上述默认使用Quartz集群模式,从主流程加载的quartz.properties中配置的集群模式如下,可进行参考。
#============================================================================ # Configure Main Scheduler Properties #============================================================================ org.quartz.scheduler.instanceName: OcpScheduler org.quartz.scheduler.instanceId: OcpInstance org.quartz.scheduler.skipUpdateCheck: true #============================================================================ # Configure ThreadPool #============================================================================ org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 50 org.quartz.threadPool.threadPriority: 5 org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true #============================================================================ # Configure JobStore #============================================================================ org.quartz.jobStore.misfireThreshold: 120000 org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.useProperties: false org.quartz.jobStore.tablePrefix: QRTZ_ org.quartz.jobStore.dataSource: ocpQzDs org.quartz.jobStore.isClustered: true org.quartz.jobStore.clusterCheckinInterval = 60000 #============================================================================ # Configure Datasources #============================================================================ org.quartz.dataSource.ocpQzDs.driver: com.mysql.jdbc.Driver org.quartz.dataSource.ocpQzDs.URL:jdbc:mysql://192.168.1.1:3306/test?useUnicode=true&characterEncoding=utf-8 org.quartz.dataSource.ocpQzDs.user: test org.quartz.dataSource.ocpQzDs.password: test org.quartz.dataSource.ocpQzDs.maxConnection: 30 #============================================================================ # Configure Plugins #============================================================================ org.quartz.plugin.shutdownHook.class: org.quartz.plugins.management.ShutdownHookPlugin org.quartz.plugin.shutdownHook.cleanShutdown: true #org.quartz.plugin.triggHistory.class: org.quartz.plugins.history.LoggingJobHistoryPlugin
转载于:https://www.cnblogs.com/mengrennwpu/p/7900316.html
最后
以上就是完美秋天为你收集整理的Quartz使用(6) - Quartz项目实战的全部内容,希望文章能够帮你解决Quartz使用(6) - Quartz项目实战所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复