当前位置:   article > 正文

Spring定时任务使用线程池及源码探索_no taskscheduler/scheduledexecutorservice bean fou

no taskscheduler/scheduledexecutorservice bean found for scheduled processin






其中如果taskScheduler 为空,会使用单线程的定时任务执行器

  1. protected void scheduleTasks() {
  2. if (this.taskScheduler == null) {
  3. // 这里是 使用一个单线程的 ScheduledExecutor
  4. this.localExecutor = Executors.newSingleThreadScheduledExecutor();
  5. this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
  6. }
  7. if (this.triggerTasks != null) {
  8. for (TriggerTask task : this.triggerTasks) {
  9. addScheduledTask(scheduleTriggerTask(task));
  10. }
  11. }
  12. if (this.cronTasks != null) {
  13. for (CronTask task : this.cronTasks) {
  14. addScheduledTask(scheduleCronTask(task));
  15. }
  16. }
  17. if (this.fixedRateTasks != null) {
  18. for (IntervalTask task : this.fixedRateTasks) {
  19. addScheduledTask(scheduleFixedRateTask(task));
  20. }
  21. }
  22. if (this.fixedDelayTasks != null) {
  23. for (IntervalTask task : this.fixedDelayTasks) {
  24. addScheduledTask(scheduleFixedDelayTask(task));
  25. }
  26. }
  27. }


在使用定时任务时我们使用了注解@EnableScheduling, 这个注解可以看到,它会@Import 一个 SchedulingConfiguration 配置类

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Import(SchedulingConfiguration.class)
  4. @Documented
  5. public @interface EnableScheduling {
  6. }

SchedulingConfiguration 配置类,会加载一个ScheduledAnnotationBeanPostProcessor 的对象。

  1. @Configuration
  2. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  3. public class SchedulingConfiguration {
  4. @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
  5. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  6. public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
  7. return new ScheduledAnnotationBeanPostProcessor();
  8. }
  9. }

它的构造方法中会创建一个ScheduledTaskRegistrar ,同时它是实现了InitializingBean 接口的类

  1. public ScheduledAnnotationBeanPostProcessor() {
  2. this.registrar = new ScheduledTaskRegistrar();
  3. }

类初始化的时候会调用到afterPropertiesSet 方法,进而调用到了scheduleTasks

  1. @Override
  2. public void afterPropertiesSet() {
  3. // 定时任务的注册
  4. scheduleTasks();
  5. }



  1. 配置 SchedulingConfigurer
  2. 使用@Async


spring scheduled单线程和多线程使用过程中的大坑!!不看到时候绝对后悔!!_FlyingSnails的博客-CSDN博客_scheduled 单线程


方案中提到的使用自己的配置SchedulingConfigurer 的方式有一定的弊端,说如果同一个Schedule 任务,执行使用的线程是同一个线程,前一个任务执行超过任务间隔,会影响后一个任务的执行。



因为 ScheduledAnnotationBeanPostProcessor 实现了 ApplicationListener ,所以在 spring 容器 refresh() 的时候调用到

  1. @Override
  2. public void onApplicationEvent(ContextRefreshedEvent event) {
  3. if (event.getApplicationContext() == this.applicationContext) {
  4. // Running in an ApplicationContext -> register tasks this late...
  5. // giving other ContextRefreshedEvent listeners a chance to perform
  6. // their work at the same time (e.g. Spring Batch's job registration).
  7. finishRegistration();
  8. }
  9. }
  10. private void finishRegistration() {
  11. if (this.scheduler != null) {
  12. this.registrar.setScheduler(this.scheduler);
  13. }
  14. if (this.beanFactory instanceof ListableBeanFactory) {
  15. Map<String, SchedulingConfigurer> beans =
  16. // 从 ioc 容器中获取 类型为 SchedulingConfigurer 的bean
  17. ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
  18. List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
  19. AnnotationAwareOrderComparator.sort(configurers);
  20. for (SchedulingConfigurer configurer : configurers) {
  21. configurer.configureTasks(this.registrar);
  22. }
  23. }
  24. }

可以看到上面代码中 beanFactory.getBeansOfType(SchedulingConfigurer.class) 获取配置的实现了 SchedulingConfigurer 类型的bean对象

所以这里要注意:实现 SchedulingConfigurer 接口的类必须要被实例化一个 Bean 对象,否则无法找到对应的配置信息。

题外话:继续看 finishRegistration 方法,会发现如果我们配置了任务但 registrar.getScheduler() 为空的时候,spring会先尝试找 TaskScheduler 类型的bean

  1. 如果发现有多个,按名称再找一个出来,名字匹配不出来就结束
  2. 如果发现没有,再尝试找 ScheduledExecutorService 类型的bean
    1. 如果发现有多个,按名称再找一个出来,名字匹配不出来就结束
    2. 如果发现有多个,查找结束


  1. if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
  2. Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
  3. try {
  4. // Search for TaskScheduler bean...
  5. // 走类型查找 TaskScheduler
  6. this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
  7. }
  8. catch (NoUniqueBeanDefinitionException ex) {
  9. // 没有唯一的bean
  10. if (logger.isTraceEnabled()) {
  11. logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
  12. ex.getMessage());
  13. }
  14. try {
  15. // 根据类名找
  16. this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
  17. }
  18. catch (NoSuchBeanDefinitionException ex2) {
  19. if (logger.isInfoEnabled()) {
  20. logger.info("More than one TaskScheduler bean exists within the context, and " +
  21. "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
  22. "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
  23. "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
  24. ex.getBeanNamesFound());
  25. }
  26. }
  27. }
  28. catch (NoSuchBeanDefinitionException ex) {
  29. if (logger.isTraceEnabled()) {
  30. logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
  31. ex.getMessage());
  32. }
  33. // Search for ScheduledExecutorService bean next...
  34. try {
  35. // 走类型查找 ScheduledExecutorService
  36. this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
  37. }
  38. catch (NoUniqueBeanDefinitionException ex2) {
  39. // 多个 bean
  40. if (logger.isTraceEnabled()) {
  41. logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
  42. ex2.getMessage());
  43. }
  44. try {
  45. // 根据类名找
  46. this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
  47. }
  48. catch (NoSuchBeanDefinitionException ex3) {
  49. if (logger.isInfoEnabled()) {
  50. logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
  51. "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
  52. "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
  53. "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
  54. ex2.getBeanNamesFound());
  55. }
  56. }
  57. }
  58. catch (NoSuchBeanDefinitionException ex2) {
  59. if (logger.isTraceEnabled()) {
  60. logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
  61. ex2.getMessage());
  62. }
  63. // Giving up -> falling back to default scheduler within the registrar...
  64. logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
  65. }
  66. }
  67. }

由此可见,如果是自己配置一个 TaskScheduler,类型的bean对象其实也能实现线程池的方式,不一定需要一个bean 实现SchedulingConfigurer 接口。

所以有结论,配置bean 的方式实现多线程其实有 两种方式:

一、配置实现SchedulingConfigurer 的bean

二、配置实现TaskScheduler 的bean


请特别留意一下 org.springframework.scheduling.concurrent.ReschedulingRunnable 类的 run 方法和schedule 方法。

当org.springframework.scheduling.config.ScheduledTaskRegistrar#scheduleTasks 被调用的时候,会进行task 的注册,然后调用到org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler#schedule 方法,它会返回ScheduledFuture ,他其实是 new 了一个 ReschedulingRunnable 并且调用了schedule。

new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
  1. @Nullable
  2. public ScheduledFuture<?> schedule() {
  3. synchronized (this.triggerContextMonitor) {
  4. this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
  5. if (this.scheduledExecutionTime == null) {
  6. return null;
  7. }
  8. long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
  9. this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
  10. return this;
  11. }
  12. }
  13. @Override
  14. public void run() {
  15. Date actualExecutionTime = new Date();
  16. super.run();
  17. Date completionTime = new Date();
  18. synchronized (this.triggerContextMonitor) {
  19. Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
  20. this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
  21. if (!obtainCurrentFuture().isCancelled()) {
  22. schedule();
  23. }
  24. }
  25. }


第一次注册任务时触发schedule 方法(重点!!入参的第一个对象是当前线程)等到间隔时间后执行run 方法,super.run() 其实执行的就是我们定义好的任务代码,执行完后判断obtainCurrentFuture().isCancelled() 是否取消执行,没有的话再次调用schedule 方法。

这样的方法相当于线程套线程,只要任务不结束线程就不会停止,一开始我还在找触发的地方,一直没找到看到schedule 中的第一个入参是个this ,然后就释然了。

从这段方法可以发现,如果我们的任务没有执行完,是不会触发下一次schedule 的调用的,所以之前总担心cron 配置的任务间隔过短的话会启动多个任务,现在看来其实是多虑了。。。


使用@Async 实现其实是借助了spring aop 对执行的方法进行增强,每次执行时都是最终调用了MethodInterceptor 的invoke 方法,对目标方法进行增强

使用注解EnableAsync 时,会加载AsyncConfigurationSelector 类,它会通过selectImports 方法,告诉spring加载ProxyAsyncConfiguration。

这个地方其实是两种模式的,在配置EnableAsync 的时候可以设置mode 属性,然后根据mode 选择不同的配置类,这里默认是PROXY,所以只关注ProxyAsyncConfiguration 类即可。

  1. public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
  3. "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
  4. /**
  5. * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
  6. * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
  7. * respectively.
  8. */
  9. @Override
  10. @Nullable
  11. public String[] selectImports(AdviceMode adviceMode) {
  12. switch (adviceMode) {
  13. case PROXY:
  14. return new String[] {ProxyAsyncConfiguration.class.getName()};
  15. case ASPECTJ:
  17. default:
  18. return null;
  19. }
  20. }
  21. }

ProxyAsyncConfiguration 中会向容器注入AsyncAnnotationBeanPostProcessor bean,这个bean实现了两个接口

第一个 BeanFactoryAware.setBeanFactory() 的时候 new AsyncAnnotationAdvisor ,在这个Advisor 中主要做了两件事情

  1. buildAdvice :new AnnotationAsyncExecutionInterceptor 作为advise ,将在将来执行方法时进行代码增强。
  2. buildPointcut : 根据@Async 注解解析到类和方法上的切入点作为pointcut ,将在将来执行方法时判定是否执行上面的advise 的逻辑。

第二个 BeanPostProcessor.postProcessAfterInitialization 方法,其实是调用了父类AbstractAdvisingBeanPostProcessor

的postProcessAfterInitialization() 这个方法中关键的一个地方是通过ProxyFactory 来生成对应增强代理类。

  1. public Object postProcessAfterInitialization(Object bean, String beanName) {
  2. // ... 其他代码
  3. if (isEligible(bean, beanName)) {
  4. ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
  5. if (!proxyFactory.isProxyTargetClass()) {
  6. evaluateProxyInterfaces(bean.getClass(), proxyFactory);
  7. }
  8. proxyFactory.addAdvisor(this.advisor);
  9. customizeProxyFactory(proxyFactory);
  10. return proxyFactory.getProxy(getProxyClassLoader());
  11. }
  12. // No proxy needed.
  13. return bean;
  14. }

其中isEligible 方法,最终是调用AopUtils 类的方法来判定,当前bean 对象是否能被Advisor 增强。

AopUtils.canApply(this.advisor, targetClass);


面试常问spring如何生成代理对象,其实答案就在ProxyFactory 里面。

当使用ProxyFactory.getProxy() 的时候,会先获取AopProxy 对象,而获取AopProxy 对象时,你就能发现,最终返回的只有两种Proxy,一个是JdkDynamicAopProxy ,一个是ObjenesisCglibAopProxy。

  1. ProxyFactory.class
  2. public Object getProxy(@Nullable ClassLoader classLoader) {
  3. return createAopProxy().getProxy(classLoader);
  4. }
  5. ------
  6. ProxyCreatorSupport.class
  7. protected final synchronized AopProxy createAopProxy() {
  8. if (!this.active) {
  9. activate();
  10. }
  11. return getAopProxyFactory().createAopProxy(this);
  12. }
  13. ------
  14. DefaultAopProxyFactory.class:
  15. public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
  16. if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
  17. Class<?> targetClass = config.getTargetClass();
  18. if (targetClass == null) {
  19. throw new AopConfigException("TargetSource cannot determine target class: " +
  20. "Either an interface or a target is required for proxy creation.");
  21. }
  22. if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
  23. return new JdkDynamicAopProxy(config);
  24. }
  25. return new ObjenesisCglibAopProxy(config);
  26. }
  27. else {
  28. return new JdkDynamicAopProxy(config);
  29. }
  30. }


定时任务的执行主要是由ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization() 方法进行触发。

方法中会扫描所有@Scheduled 注释的方法,然后对这些方法执行processScheduled

  1. protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
  2. try {
  3. Runnable runnable = createRunnable(bean, method);
  4. boolean processedSchedule = false;
  5. String errorMessage =
  6. "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
  7. Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
  8. // 其他 任务类型处理
  9. // Check cron expression
  10. String cron = scheduled.cron();
  11. if (StringUtils.hasText(cron)) {
  12. String zone = scheduled.zone();
  13. if (this.embeddedValueResolver != null) {
  14. cron = this.embeddedValueResolver.resolveStringValue(cron);
  15. zone = this.embeddedValueResolver.resolveStringValue(zone);
  16. }
  17. if (StringUtils.hasLength(cron)) {
  18. Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
  19. processedSchedule = true;
  20. if (!Scheduled.CRON_DISABLED.equals(cron)) {
  21. TimeZone timeZone;
  22. if (StringUtils.hasText(zone)) {
  23. timeZone = StringUtils.parseTimeZoneString(zone);
  24. }
  25. else {
  26. timeZone = TimeZone.getDefault();
  27. }
  28. tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
  29. }
  30. }
  31. }
  32. // 其他 任务类型处理
  33. // Check whether we had any attribute set
  34. Assert.isTrue(processedSchedule, errorMessage);
  35. // Finally register the scheduled tasks
  36. synchronized (this.scheduledTasks) {
  37. Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
  38. regTasks.addAll(tasks);
  39. }
  40. }
  41. catch (IllegalArgumentException ex) {
  42. throw new IllegalStateException(
  43. "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
  44. }
  45. }

其中重点方法是createRunnable ,它会生成ScheduledMethodRunnable 对象

  1. protected Runnable createRunnable(Object target, Method method) {
  2. Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
  3. Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
  4. return new ScheduledMethodRunnable(target, invocableMethod);
  5. }

从上面第一部分解析可以指导任务都是由ReschedulingRunnable 线程自己调用自己实现定时执行,那它最终run 的方法其实是ScheduledMethodRunnable 的run 方法。

  1. public void run() {
  2. try {
  3. ReflectionUtils.makeAccessible(this.method);
  4. this.method.invoke(this.target);
  5. }
  6. catch (InvocationTargetException ex) {
  7. ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
  8. }
  9. catch (IllegalAccessException ex) {
  10. throw new UndeclaredThrowableException(ex);
  11. }
  12. }

这里如果是第一种方案的话,this.target 是普通的代理对象,调用方法是同步执行的,第一个任务执行完才会触发第二个任务执行。

如果是第二种方案的话,this.target 是带有增强逻辑的代理对象

真正执行的方法是org.springframework.aop.framework.CglibAopProxy.DynamicAdvisedInterceptor#intercept 方法

然后是org.springframework.aop.framework.CglibAopProxy.CglibMethodInvocation#proceed 方法

再然后是org.springframework.aop.framework.ReflectiveMethodInvocation#proceed,最终执行的是else 结构,也就是((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);

  1. public Object proceed() throws Throwable {
  2. // We start with an index of -1 and increment early.
  3. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
  4. return invokeJoinpoint();
  5. }
  6. Object interceptorOrInterceptionAdvice =
  7. this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
  8. if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
  9. // Evaluate dynamic method matcher here: static part will already have
  10. // been evaluated and found to match.
  11. InterceptorAndDynamicMethodMatcher dm =
  12. (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
  13. Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
  14. if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
  15. return dm.interceptor.invoke(this);
  16. }
  17. else {
  18. // Dynamic matching failed.
  19. // Skip this interceptor and invoke the next in the chain.
  20. return proceed();
  21. }
  22. }
  23. else {
  24. // It's an interceptor, so we just invoke it: The pointcut will have
  25. // been evaluated statically before this object was constructed.
  26. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
  27. }
  28. }

这里的interceptorOrInterceptionAdvice 其实就是AnnotationAsyncExecutionInterceptor,在invoke 方法中会获取配置taskScheduler ,进行任务处理,而这里就是把任务一个个丢到线程池里执行,这个就是和第一种方式最大的区别。

  1. public Object invoke(final MethodInvocation invocation) throws Throwable {
  2. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  3. Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
  4. final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
  5. AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
  6. if (executor == null) {
  7. throw new IllegalStateException(
  8. "No executor specified and no default executor set on AsyncExecutionInterceptor either");
  9. }
  10. Callable<Object> task = () -> {
  11. try {
  12. Object result = invocation.proceed();
  13. if (result instanceof Future) {
  14. return ((Future<?>) result).get();
  15. }
  16. }
  17. catch (ExecutionException ex) {
  18. handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
  19. }
  20. catch (Throwable ex) {
  21. handleError(ex, userDeclaredMethod, invocation.getArguments());
  22. }
  23. return null;
  24. };
  25. return doSubmit(task, executor, invocation.getMethod().getReturnType());
  26. }

到这里其实你会发现,第一种配置方式和第二种配置方式道理上都是相同的,都是ReschedulingRunnable 的run 和schedule 配合触发定时执行任务,不同的地方在于执行任务时处理任务的方式不同(一个是单线程调用,一个是多线程调用)

再仔细想想,第一种配置是将不同的任务放在不同的线程中执行,任务数超过线程池线程数,就会有等待的情况;第二种是不同的任务在单线程处理所有的定时任务,每次处理的任务都是丢给另外一个线程池进行执行 enmmmm 是这样的


@Import 的使用

实现InitializingBean 接口类的初始化

实现BeanFactoryAware 接口类的初始化

实现ApplicationListener 接口类的初始化

bean 的初始化过程

BeanPostProcessor 接口类

spring 生成代理对象

spring 实现aop(Advisor Advice Pointcut)



  1. 最终执行的target 是不同的类型,一个是ConfigurationClassEnhancer 类型另一个又是CglibAopProxy 类型,为啥,他俩的区别?
  2. ConfigurationClassEnhancer 和 CglibAopProxy 的Classback 是如何配置进去的,又是如何使用的?
