@ComponentScan(value = "com.wang") @Configuration @EnableAsync public class AppConfig { }
@Service public class CycleService2 { @Autowired private CycleService1 cycleService1; @Async public void alsoDo() { System.out.println("create cycleService2"); } }
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { /** * Indicate the 'async' annotation type to be detected at either class * or method level. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. * 此处说明的是方法执行变成异步,扫描的是哪个注解,目前默认的是Async和Asynchronous,开发者也可以自定义 */ Class<? extends Annotation> annotation() default Annotation.class; /** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>. * <p>The default is {@code false}. * <p>Note that setting this attribute to {@code true} will affect <em>all</em> * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another — for example, in tests. * 如何proxyTargetClass被设置成true,那么spring的所有proxy都会通过CGLIB方式实现,不再使用Java默认的基于接口的代理实现方式;而且此处如果设置,不仅仅是会影响添加了@Async注解的类的proxy方式,加了@Transactional的类也会变成CGLIB代理,不推荐修改;这个注解只有mode是默认的PROXY,才有意义 */ boolean proxyTargetClass() default false; /** * Indicate how async advice should be applied. * <p><b>The default is {@link AdviceMode#PROXY}.</b> * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. * 代理方式的不同,默认的是使用Spring的proxy方式,也可以换成原生的AspectJ的proxy方式。 * 这两个的区别作用还是很明显的 */ AdviceMode mode() default AdviceMode.PROXY; /** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. * 因为在beanPostProcessor执行的时候,会根据order值进行排序,此处设置为最低值,就是想让其最后执行 * 其实即使不设置这个值,因为AsyncAnnotationBeanPostProcessor继承了ProxyProcessorSupport,ProxyProcessorSupport中的order默认也是最小优先级 * */ int order() default Ordered.LOWEST_PRECEDENCE; }
2. AsyncConfigurationSelector的作用
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, * respectively. */ @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } }
3. adviceMode:PROXY(默认值)
3.1 ProxyAsyncConfiguration
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); bpp.configure(this.executor, this.exceptionHandler); Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
3.2 AsyncAnnotationBeanPostProcessor
public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor { // 删除了一些无关紧要,或者默认不会设置的属性 public AsyncAnnotationBeanPostProcessor() { setBeforeExistingAdvisors(true); } /** * 因为AsyncAnnotationBeanPostProcessor实现了BeanFactoryAware接口 * 所以在实例化的过程中执行到initializeBean步骤的时候,里面第一步就是执行各种实现了Aware接口的接口方法 * 在此处new了一个advisor。advisor简单理解就是:advice+pointcut * @param beanFactory */ @Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; } }
3.3 AsyncAnnotationAdvisor
public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2); /** * 这儿设置符合pointCut需要的注解 * 此处的executor就是一个扩展点,如果不想用spring的默认单线程线程池,可以自定义一个线程池 * exceptionHandler,顾名思义,就是我们的方法在线程池中执行时抛出exception该如何handle使用的 * advice也就是咱们的interceptor * pointCut就不多解释了,就是把设置符合什么条件会进行interceptor的invoke方法 */ asyncAnnotationTypes.add(Async.class); try { asyncAnnotationTypes.add((Class<? extends Annotation>) ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader())); } catch (ClassNotFoundException ex) { // If EJB 3.1 API not present, simply ignore. } this.advice = buildAdvice(executor, exceptionHandler); this.pointcut = buildPointcut(asyncAnnotationTypes); }
3.3.1 buildAdvice
protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) { // new了一个interceptor AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null); interceptor.configure(executor, exceptionHandler); return interceptor; }
可以看到advice主要就是定义了一个烂机器interceptor,在方法执行的时候进行一些拦截,至于executor,是方法执行器,默认为null,exceptionHandler也默认是null。 AnnotationAsyncExecutionInterceptor,异步执行的原理
/** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */ @Override @Nullable public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); /**获取一个任务执行器 * 1. 从@Async注解里面获取配置的任务执行器 * 2. 从Spring容器中找TaskExecutor类的bean * 3. 从spring容器中获取名为"taskExecutor"的bean, * 4. 如果还没有,new SimpleAsyncTaskExecutor()) */ AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } //将当前方法执行封装成一个callable对象,然后放入到线程池里 Callable<Object> task = () -> { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; }; //任务提交 return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
- 寻找任务执行器:
- 从@Async注解里面获取配置的任务执行器
- 从Spring容器中找TaskExecutor类的bean
- 从spring容器中获取名为"taskExecutor"的bean,
- 如果还没有,new SimpleAsyncTaskExecutor())可以看到其实我们是可以给@Async进行任务执行器的配置的。
- 将具体的方法封装成callable的对象,然后doSubmit
- 此处我们就看一下默认的doSumit,使用的SimpleAsyncTaskExecutor是如何实现的
- 最终会执行到下面这个doExecute方法,默认情况下threadFactory是null,所以默认情况下,我们的方法,每次都是被创建了一个新的守护线程来进行方法的执行。
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); } 自定义任务执行器
- 可以在配置类里new SimpleAsyncTaskExecutor(),然后setThreadFactory,这样修改了默认线程的产生方式
- 比较主流的方式是,定义一个ThreadPoolTaskExecutor,也就是线程池任务执行器,可以进行线程复用
3.3.2 buildPointcut
/** * Calculate a pointcut for the given async annotation types, if any. * @param asyncAnnotationTypes the async annotation types to introspect * @return the applicable Pointcut object, or {@code null} if none */ protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) { ComposablePointcut result = null; for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) { // 就是根据这两个匹配器进行匹配的 // 检查类上是否有@Async注解 Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true); //检查方法上是否有@Async注解 Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true); if (result == null) { result = new ComposablePointcut(cpc); } else { // 取并集:类上加了@Async或者类的方法上加了@Async result.union(cpc); } result = result.union(mpc); } return (result != null ? result : Pointcut.TRUE); }
4 什么时候判断进行advice的添加呢
3.4.1 AsyncAnnotationBeanPostProcessor#postProcessAfterInitialization
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { // 没有通知,或者是AOP的基础设施类,那么不进行代理 if (this.advisor == null || bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } // 对已经被代理的类,不再生成代理,只是将通知添加到代理类的逻辑中 // 这里通过beforeExistingAdvisors决定是将通知添加到所有通知之前还是添加到所有通知之后 // 在使用@Async注解的时候,beforeExistingAdvisors被设置成了true, // @Async注解之所以把beforeExistingAdvisors设置为true,是因为该advisor和其他的advisor差别太大了,从情理上讲,也应该第一个执行 // 意味着整个方法及其拦截逻辑都会异步执行 if (bean instanceof Advised) { Advised advised = (Advised) bean; // 判断bean是否符合该advisor的使用范围,通过pointcut来判断 if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } // 如果还不是一个代理类,也需要通过eligible来判断是否符合使用该advisor的条件 if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No proxy needed. return bean; }
protected boolean isEligible(Class<?> targetClass) { Boolean eligible = this.eligibleBeans.get(targetClass); if (eligible != null) { return eligible; } if (this.advisor == null) { return false; } // 其实就是判断类是否可以进行添加该advisor,也就是判断是否符合该advisor的使用条件 // 就是把advisor的pointCut拿出来,pointCut里的classMatcher和methodMatcher拿出来对类及其方法进行判断 eligible = AopUtils.canApply(this.advisor, targetClass); this.eligibleBeans.put(targetClass, eligible); return eligible; }
具体的AopUtils.canApply(this.advisor, targetClass)逻辑就不写了,就是根据pointcut里设置的classFilter和methodMatcher类判断当前bean的class是否需要进行该advisor的使用。
