概述
主要涉及类:一系列JobHandler用于job的处理、JobManager用于job的入库调度激活挂起等操作、
asyncExecutor用于job的异步执行(通过线程池实现、和3个死循环的线程用于扫描job表操作调度job)
首先查看引擎配置初始化类ProcessEngineConfigurationImpl的init方法:
其中调用了
//------------作业处理器相关---------------
initJobHandlers();//作业处理器集合(异步任务、定时触发任务、开始事件、定时挂起激活流程事件等、自定义等处理器)
initJobManager();//job任务管理器(用于创建或调度各种job的操作)
initAsyncExecutor();//异步执行器(基于ThreadPoolExecutor线程池和BlockingQueue阻塞队列)
第一个initJobHandlers(),用于各种任务处理器放入一个map中,方便使用时候通过key类型去获取。
第二个initJobManager(),进行JobManager的初始化,用于异步job、timerJob的创建和调度、挂起等操作。
异步job通过异步执行器去执行。
第三个initAsyncExecutor(),初始化异步执行器(通过线程池实现)
public void initAsyncExecutor() {
if (asyncExecutor == null) {
DefaultAsyncJobExecutor defaultAsyncExecutor = new DefaultAsyncJobExecutor();
// Message queue mode 队列模式 默认false
defaultAsyncExecutor.setMessageQueueMode(asyncExecutorMessageQueueMode);
// Thread pool config
defaultAsyncExecutor.setCorePoolSize(asyncExecutorCorePoolSize);//核心线程数
defaultAsyncExecutor.setMaxPoolSize(asyncExecutorMaxPoolSize);//最大线程数
defaultAsyncExecutor.setKeepAliveTime(asyncExecutorThreadKeepAliveTime);//保持存活时间
// Threadpool queue
if (asyncExecutorThreadPoolQueue != null) {
defaultAsyncExecutor.setThreadPoolQueue(asyncExecutorThreadPoolQueue);//阻塞队列
}
defaultAsyncExecutor.setQueueSize(asyncExecutorThreadPoolQueueSize);//阻塞队列大小
// Acquisition wait time
defaultAsyncExecutor.setDefaultTimerJobAcquireWaitTimeInMillis(asyncExecutorDefaultTimerJobAcquireWaitTime);//设置作业计时器查询间隔默认的等待时间10s
defaultAsyncExecutor.setDefaultAsyncJobAcquireWaitTimeInMillis(asyncExecutorDefaultAsyncJobAcquireWaitTime);//异步作业查询间隔默认等待时间,默认10s
// Queue full wait time
defaultAsyncExecutor.setDefaultQueueSizeFullWaitTimeInMillis(asyncExecutorDefaultQueueSizeFullWaitTime);//队列已满时的等待时间默认0s
// Job locking
defaultAsyncExecutor.setTimerLockTimeInMillis(asyncExecutorTimerLockTimeInMillis);//定时执行器锁时间,默认5分钟
defaultAsyncExecutor.setAsyncJobLockTimeInMillis(asyncExecutorAsyncJobLockTimeInMillis);//异步作业执行器锁时间,默认5分钟
if (asyncExecutorLockOwner != null) {//执行器锁所有者
defaultAsyncExecutor.setLockOwner(asyncExecutorLockOwner);
}
// Reset expired
defaultAsyncExecutor.setResetExpiredJobsInterval(asyncExecutorResetExpiredJobsInterval);//重制过期任务间隔
defaultAsyncExecutor.setResetExpiredJobsPageSize(asyncExecutorResetExpiredJobsPageSize);//重置过期任务查询条数
// Shutdown
defaultAsyncExecutor.setSecondsToWaitOnShutdown(asyncExecutorSecondsToWaitOnShutdown);//关闭前等待时间
asyncExecutor = defaultAsyncExecutor;
}
asyncExecutor.setProcessEngineConfiguration(this);
asyncExecutor.setAutoActivate(asyncExecutorActivate);//是否自动激活,默认true
}
且在ProcessEngineImpl的构造方法里进行了asyncExecutor线程的启动:
public ProcessEngineImpl(ProcessEngineConfigurationImpl processEngineConfiguration) {
//服务及配置注入
this.processEngineConfiguration = processEngineConfiguration;
this.name = processEngineConfiguration.getProcessEngineName();//默认为:default
this.repositoryService = processEngineConfiguration.getRepositoryService();
this.runtimeService = processEngineConfiguration.getRuntimeService();
this.historicDataService = processEngineConfiguration.getHistoryService();
this.identityService = processEngineConfiguration.getIdentityService();
this.taskService = processEngineConfiguration.getTaskService();
this.formService = processEngineConfiguration.getFormService();
this.managementService = processEngineConfiguration.getManagementService();
this.dynamicBpmnService = processEngineConfiguration.getDynamicBpmnService();
this.asyncExecutor = processEngineConfiguration.getAsyncExecutor();
this.commandExecutor = processEngineConfiguration.getCommandExecutor();
this.sessionFactories = processEngineConfiguration.getSessionFactories();
this.transactionContextFactory = processEngineConfiguration.getTransactionContextFactory();
this.formEngineRepositoryService = processEngineConfiguration.getFormEngineRepositoryService();
this.formEngineFormService = processEngineConfiguration.getFormEngineFormService();
if (processEngineConfiguration.isUsingRelationalDatabase() && processEngineConfiguration.getDatabaseSchemaUpdate() != null) {
//数据库脚本检查,根据配置的schema策略
commandExecutor.execute(processEngineConfiguration.getSchemaCommandConfig(), new SchemaOperationsProcessEngineBuild());
}
if (name == null) {
log.info("default activiti ProcessEngine created");
} else {
log.info("ProcessEngine {} created", name);
}
ProcessEngines.registerProcessEngine(this);//放入引擎缓存map(name:engine实例)
if (asyncExecutor != null && asyncExecutor.isAutoActivate()) {//异步执行器状态激活,默认true
asyncExecutor.start();//启动各种线程池和线程
}
if (processEngineConfiguration.getProcessEngineLifecycleListener() != null) {//引擎被创建监听器触发
processEngineConfiguration.getProcessEngineLifecycleListener().onProcessEngineBuilt(this);
}
//更新缓存:实例表覆盖原始实例表bpmnModel缓存
initModel(managementService);
processEngineConfiguration.getEventDispatcher().dispatchEvent(ActivitiEventBuilder.createGlobalEvent(ActivitiEventType.ENGINE_CREATED));
}
接下来我们查看DefaultAsyncJobExecutor的start及相关方法源码:
public boolean executeAsyncJob(final Job job) {
if (isMessageQueueMode) {//使用基于消息队列的作业执行器运行时,这里不执行作业。
// When running with a message queue based job executor,
// the job is not executed here.
return true;
}
Runnable runnable = null;
if (isActive) {
//基于job创建线程
runnable = createRunnableForJob(job);
try {
executorService.execute(runnable);//放入线程池执行
} catch (RejectedExecutionException e) {
......
}
}
public void start() {
if (isActive) {//默认false未激活
return;
}
log.info("Starting up the default async job executor [{}].", getClass().getName());
if (timerJobRunnable == null) {//timerJob线程(负责查询最新(<=now)一条的timerJob表数据并放入job表(所有线程都起来后也就是isActive=true后会涉及加锁set锁过期时间操作),删除timerJob表)
timerJobRunnable = new AcquireTimerJobsRunnable(this, processEngineConfiguration.getJobManager());
}
if (resetExpiredJobsRunnable == null) {//new 重置过期job线程(查持有锁时间过期的job,并进行锁撤销:
// 6版本:删除旧的job,插入新的job(新的不设置lock过期时间和id,lock持有者等);5版本:直接更新)
resetExpiredJobsRunnable = new ResetExpiredJobsRunnable(this);
}
if (!isMessageQueueMode && asyncJobsDueRunnable == null) {
asyncJobsDueRunnable = new AcquireAsyncJobsDueRunnable(this);
}
if (!isMessageQueueMode) {//非消息队列模式
initAsyncJobExecutionThreadPool();//初始化异步线程池
startJobAcquisitionThread();//启动异步可执行作业线程(开始真正的执行)
}
startTimerAcquisitionThread();//启动timerJob作业获取线程(作业搬运到job)
startResetExpiredJobsThread();//启动重置过期job线程
isActive = true;//设置为异步作业执行器为已激活
executeTemporaryJobs();//执行临时作业
}
//初始化一个线程池
protected void initAsyncJobExecutionThreadPool() {
if (threadPoolQueue == null) {
log.info("Creating thread pool queue of size {}", queueSize);
threadPoolQueue = new ArrayBlockingQueue<Runnable>(queueSize);
}
if (executorService == null) {
log.info("Creating executor service with corePoolSize {}, maxPoolSize {} and keepAliveTime {}", corePoolSize, maxPoolSize, keepAliveTime);
BasicThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern("activiti-async-job-executor-thread-%d").build();
executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, threadPoolQueue, threadFactory);
}
}
//启动异步线程池
protected void startJobAcquisitionThread() {
if (asyncJobAcquisitionThread == null) {
asyncJobAcquisitionThread = new Thread(asyncJobsDueRunnable);
}
asyncJobAcquisitionThread.start();
}
//启动timerJob线程
protected void startTimerAcquisitionThread() {
if (timerJobAcquisitionThread == null) {
timerJobAcquisitionThread = new Thread(timerJobRunnable);
}
timerJobAcquisitionThread.start();
}
//启动重置过期job线程
protected void startResetExpiredJobsThread() {
if (resetExpiredJobThread == null) {
resetExpiredJobThread = new Thread(resetExpiredJobsRunnable);
}
resetExpiredJobThread.start();
}
该类主要职责:
1、进行了线程池的创建和启动、线程放入线程池执行
2、启动AcquireTimerJobsRunnable 线程(timerJob作业搬运到job):
timerJob线程(负责查询最新(<=now())一条的timerJob表数据并放入job表(所有线程都起来后也就是isActive=true后会涉及加锁set锁过期时间操作),删除timerJob表)
3、启动ResetExpiredJobsRunnable重置过期job线程
重置过期job线程:(查持有锁时间过期的job,并进行锁撤销:
6版本:删除旧的job,插入新的job(新的不设置lock过期时间和id,lock持有者等);
5版本:直接更新)
4、启动AcquireAsyncJobsDueRunnable该线程里开始job真正的执行,通过步骤1异步处理器的线程池去执行线程
:该步骤在线程池里最终调用了JobManager().execute(job);方法
该方法如下:
public void execute(Job job) {
if (job instanceof JobEntity) {
if (Job.JOB_TYPE_MESSAGE.equals(job.getJobType())) {//message
executeMessageJob((JobEntity) job);
} else if (Job.JOB_TYPE_TIMER.equals(job.getJobType())) {//timer
executeTimerJob((JobEntity) job);
}
} else {
throw new ActivitiException("Only jobs with type JobEntity are supported to be executed");
}
}
之后通过作业处理器集合取到相应的处理器进行了job的执行 。
如下代码:
Map<String, JobHandler> jobHandlers = processEngineConfiguration.getJobHandlers();
JobHandler jobHandler = jobHandlers.get(jobEntity.getJobHandlerType());
jobHandler.execute(jobEntity, jobEntity.getJobHandlerConfiguration(), execution, getCommandContext());
综上:
1、init用于jobHandlerMap、jobManager、asyncExecutor异步处理器的初始化
2、jobManager用于job的创建、挂起、数据库操作等。
3、AsyncExecutor异步处理器通过线程池和多个线程去扫表并进行job的调度,通过线程池去执行,并最终通过jobManager的的execute调用job对应的具体jobHandler去处理执行job。
说明:异步处理器中线程的启动在ProcessEngineImpl构造方法。
待补充....
Activiti社区交流群:839915498
更多activiti系列教程:activiti教程
最后
以上就是俏皮小霸王为你收集整理的activiti源码解析系列2 - 引擎作业处理器源码分析Activiti社区交流群:839915498的全部内容,希望文章能够帮你解决activiti源码解析系列2 - 引擎作业处理器源码分析Activiti社区交流群:839915498所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复