当前位置:   article > 正文

【Spring源码】Spring Event事件_spring events 能支持集群消费么

spring events 能支持集群消费么

目录

1、前言

2、什么是Spring Event?

3、基本使用

3.1、定义事件

3.2、发布事件

3.3、监听事件

3.3.1、继承ApplicationListener

3.3.2、使用@EventListener注解

4、Spring Event是同步还是异步?

4.1、源码实现

4.2、如何实现异步

4.2.1、使用@Async注解

4.2.2、手动实现异步线程池

4.2.3、自定义ApplicationEventMulticaster

5、@TransactionalEventListener

5.1、基本使用


1、前言

事件发布/订阅机制在实际项目中很经常用到,一方面可以很容易让我们的代码进行解耦,另一方面可以很方便的进行一对一或一对多的消息通信,是一种常见的观察者设计模式,具有很好的扩展性。今天就来讲一下Spring的事件机制。

2、什么是Spring Event?

Spring框架中的事件是一种观察者设计模式的实现,用于在应用程序中处理各种状态变化。事件驱动编程是一种流行的编程范式,其中组件之间的通信是通过事件(或消息)进行的。Spring的事件机制允许对象在状态发生变化时发布事件,其他对象则可以订阅这些事件并在事件发生时执行特定的操作。

3、基本使用

Spring Event的使用基本有以下几个步骤:定义事件,发布事件,监听事件

3.1、定义事件

先定义一个事件Event,继承Spring的ApplicationEvent,声明构造函数将需要传递的事件信息包装为业务事件类。如:

  1. /**
  2. * 这里定义事件DamIllegalDataEvent。
  3. */
  4. public class DamIllegalDataEvent extends ApplicationEvent {
  5. // 声明构造函数,接收DamIllegalDataDto集合传递到事件中
  6. public DamIllegalDataEvent(List<DamIllegalDataDto> list) {
  7. super(list);
  8. }
  9. }

3.2、发布事件

发布事件时可以注入ApplicationEventPublisher,也可以获取到ApplicationContext,然后调用publisherEvent()方法推送事件。

  1. @RestController
  2. @RequestMapping("anno/dam")
  3. public class DamTestController {
  4. @Autowired
  5. private ApplicationEventPublisher applicationPushBuilder;
  6. @GetMapping("test_audit")
  7. public String test_audit(){
  8. DamIllegalDataDto build = DamIllegalDataDto.builder().illegalData("11111").source("2222").functionDesc("数据清理中错误了").functionName("333").build();
  9. // 注入applicationPushBuilder
  10. applicationPushBuilder.publishEvent(new DamIllegalDataEvent(Collections.singletonList(build)));
  11. // 这里也可以直接使用hutool工具类直接发布
  12. SpringUtil.publishEvent(new DamIllegalDataEvent(Collections.singletonList(build)));
  13. return "ok";
  14. }
  15. }

3.3、监听事件

监听事件也可称为订阅事件,即当事件发布了之后,需要监听该事件并进行消费。Spring里面提供了两种事件订阅的方式:

  • 继承ApplicationListener,并实现onApplicationEvent方法。
  • 使用@EventListener注解方法。

3.3.1、继承ApplicationListener

创建一个监听器DamIllegalDataEventListener继承ApplicationListener,通过泛型指定需要监听的事件类。如:

  1. @Slf4j
  2. @Component
  3. public class DamIllegalDataEventListener implements ApplicationListener<DamIllegalDataEvent> {
  4. @Autowired
  5. private DamIllegalDataAuditService damIllegalDataAuditService;
  6. @Override
  7. public void onApplicationEvent(DamIllegalDataEvent event) {
  8. LOGGER.info("异常数据审计事件开始执行...");
  9. List<DamIllegalDataDto> damIllegalDataDtos = (List<DamIllegalDataDto>) event.getSource();
  10. // todo......
  11. doSomething();
  12. }
  13. }

3.3.2、使用@EventListener注解

使用@EventListener注解方法,将其包装为事件处理器。它适用于:1. 不想为每个事件处理都创建一个ApplicationListener实现类;2. 希望支持更复杂的事件条件过滤。@EventListener的classes属性可以过滤事件类型,而condition属性可以根据事件对象是否满足条件表达式来过滤事件。

  1. @Slf4j
  2. @Component
  3. public class DamIllegalDataEventListener {
  4. /**
  5. * EventListener注解定义事件处理器,并指定监听事件为DamIllegalDataEvent。
  6. * condition声明只有事件的code==200时,才进入该事件
  7. */
  8. @EventListener(classes = {DamIllegalDataEvent.class}, condition="#event.code==200")
  9. public void onApplicationEvent(DamIllegalDataEvent event) {
  10. LOGGER.info("异常数据审计事件开始执行...");
  11. List<DamIllegalDataDto> damIllegalDataDtos = (List<DamIllegalDataDto>) event.getSource();
  12. // todo......
  13. doSomething();
  14. }
  15. }

4、Spring Event是同步还是异步?

默认情况下 Spring Event是同步执行的。你怎么这么确定?我们先来演示下上面的demo。先实现一个测试接口,该接口发布了一个事件,发布完后打印一行日志:

  1. @GetMapping("test_audit")
  2. public String test_audit(){
  3. DamIllegalDataDto build = DamIllegalDataDto.builder().illegalData("11111").source("2222").functionDesc("数据清理中错误了").functionName("333").build();
  4. SpringUtil.publishEvent(new DamIllegalDataEvent(Collections.singletonList(build)));
  5. System.out.println("接口请求完成......");
  6. return "ok";
  7. }

事件监听中打印一行日志,并睡眠5s:

  1. @Slf4j
  2. @Component
  3. public class DamIllegalDataEventListener implements ApplicationListener<DamIllegalDataEvent> {
  4. @Override
  5. public void onApplicationEvent(DamIllegalDataEvent event) {
  6. LOGGER.info("异常数据审计事件开始执行...");
  7. ThreadUtil.sleep(5000);
  8. }
  9. }

执行查看结果,可以发现不管如何请求,日志打印总是按顺序执行,并且会间隔5S。

4.1、源码实现

如果还是不信?那我们来看源码:org.springframework.context.ApplicationEventPublisher#publishEvent(java.lang.Object),断点跟进到org.springframework.context.support.AbstractApplicationContext#publishEvent(java.lang.Object, org.springframework.core.ResolvableType)。

  1. protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
  2. // 包装ApplicationEvent
  3. ApplicationEvent applicationEvent;
  4. if (event instanceof ApplicationEvent) {
  5. applicationEvent = (ApplicationEvent) event;
  6. }
  7. else {
  8. applicationEvent = new PayloadApplicationEvent<>(this, event);
  9. if (eventType == null) {
  10. eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
  11. }
  12. }
  13. // 考虑到部分事件在Listener注册之前就发布了,因此先保存起来
  14. if (this.earlyApplicationEvents != null) {
  15. this.earlyApplicationEvents.add(applicationEvent);
  16. }
  17. else {
  18. // 重点是这里
  19. // 铜通过getApplicationEventMulticaster()获取事件发布器;
  20. // 调用multicastEvent方法发布事件
  21. getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
  22. }
  23. // 同时给父容器发布事件
  24. if (this.parent != null) {
  25. if (this.parent instanceof AbstractApplicationContext) {
  26. ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
  27. }
  28. else {
  29. this.parent.publishEvent(event);
  30. }
  31. }
  32. }

跟进multicastEvent()方法,org.springframework.context.event.SimpleApplicationEventMulticaster#multicastEvent(org.springframework.context.ApplicationEvent, org.springframework.core.ResolvableType):

  1. @Override
  2. public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
  3. ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
  4. Executor executor = getTaskExecutor();
  5. for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
  6. // 这里可以看出,如果有指定任务执行器,那么就异步执行;否则直接调用,也就是同步执行。
  7. if (executor != null) {
  8. executor.execute(() -> invokeListener(listener, event));
  9. }
  10. else {
  11. invokeListener(listener, event);
  12. }
  13. }
  14. }

4.2、如何实现异步

实现异步方式,可以有3中实现:

  • 使用@Async 注解
  • 手动实现异步线程池
  • 自定义ApplicationEventMulticaster

4.2.1、使用@Async注解

使用这个很简单,只要在事件监听方法上添加@Async注解即可,springboot的启动器需要开启异步@EnableAsync。

  1. @Async
  2. @Override
  3. public void onApplicationEvent(DamIllegalDataEvent event) {
  4. LOGGER.info("异常数据审计事件开始执行...");
  5. ThreadUtil.sleep(5000);
  6. }

注意:

使用@Async时,最好自己配置相应的线程池核心数以及延迟队列等等。由于Spring中使用@Async异步线程每次都会创建一个新线程执行,如果滥用 它,可能会有内存问题。

4.2.2、手动实现异步线程池

顾名思义就是手动创建一个线程池执行,与@Async类似。

  1. @Slf4j
  2. @Component
  3. public class DamIllegalDataEventListener implements ApplicationListener<DamIllegalDataEvent> {
  4. @Override
  5. public void onApplicationEvent(DamIllegalDataEvent event) {
  6. ThreadUtil.execAsync(() -> {
  7. LOGGER.info("异常数据审计事件开始执行...");
  8. ThreadUtil.sleep(5000);
  9. });
  10. }
  11. }

4.2.3、自定义ApplicationEventMulticaster

由于Spring容器会优先使用beanName为applicationEventMulticater 的bean作为事件转发处理器,如果不存在则默认使用SimpleApplicationEventMulticaster作为事件转发处理器,它默认是同步执行的。但它支持设置Executor,那么我们可以将自定义的线程池处理器作为Executor,以此来支持异步执行。

  1. @Configuration
  2. public class DamEventConfig {
  3. @Bean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME)
  4. public SimpleApplicationEventMulticaster eventMulticaster(){
  5. SimpleApplicationEventMulticaster simpleApplicationEventMulticaster = new SimpleApplicationEventMulticaster();
  6. simpleApplicationEventMulticaster.setTaskExecutor(taskExecutor());
  7. return simpleApplicationEventMulticaster;
  8. }
  9. /**
  10. * 目前服务器为8c,默认给他4个,一般事件推送的情况不会多。如果多的话,请检查一下业务使用
  11. * @return
  12. */
  13. @Bean
  14. public TaskExecutor taskExecutor(){
  15. ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  16. executor.setCorePoolSize(4);
  17. return executor;
  18. }
  19. }

配置完之后,事件监听那边都无需修改。

注意:

这种方式的配置是全局性的,一旦配置了之后,所有的事件都是异步的形式处理。如果需要个别业务是同步的,那么此种方式要特别注意。

5、@TransactionalEventListener

提到事件,这里再提一个注解@TransactionalEventListener,也即感知事务,基于事件形式与事务的某个阶段进行绑定。比如在事务提交之前或之后进行一些业务的处理,如短信提醒等等。@TransactionEventListener允许事件处理方法感知事务。它的phase属性,表示希望在事务的哪个阶段执行事件处理。

  1. @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @EventListener
  5. public @interface TransactionalEventListener {
  6. /**
  7. * Phase to bind the handling of an event to.
  8. * <p>The default phase is {@link TransactionPhase#AFTER_COMMIT}.
  9. * <p>If no transaction is in progress, the event is not processed at
  10. * all unless {@link #fallbackExecution} has been enabled explicitly.
  11. */
  12. TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;
  13. /**
  14. * Whether the event should be handled if no transaction is running.
  15. */
  16. boolean fallbackExecution() default false;
  17. /**
  18. * Alias for {@link #classes}.
  19. */
  20. @AliasFor(annotation = EventListener.class, attribute = "classes")
  21. Class<?>[] value() default {};
  22. /**
  23. * The event classes that this listener handles.
  24. * <p>If this attribute is specified with a single value, the annotated
  25. * method may optionally accept a single parameter. However, if this
  26. * attribute is specified with multiple values, the annotated method
  27. * must <em>not</em> declare any parameters.
  28. */
  29. @AliasFor(annotation = EventListener.class, attribute = "classes")
  30. Class<?>[] classes() default {};
  31. /**
  32. * Spring Expression Language (SpEL) attribute used for making the event
  33. * handling conditional.
  34. * <p>The default is {@code ""}, meaning the event is always handled.
  35. * @see EventListener#condition
  36. */
  37. @AliasFor(annotation = EventListener.class, attribute = "condition")
  38. String condition() default "";
  39. /**
  40. * An optional identifier for the listener, defaulting to the fully-qualified
  41. * signature of the declaring method (e.g. "mypackage.MyClass.myMethod()").
  42. * @since 5.3
  43. * @see EventListener#id
  44. * @see TransactionalApplicationListener#getListenerId()
  45. */
  46. @AliasFor(annotation = EventListener.class, attribute = "id")
  47. String id() default "";
  48. }

TransactionPhase枚举声明了事务提交的各个阶段:

  1. public enum TransactionPhase {
  2. /**
  3. * Handle the event before transaction commit.
  4. * @see TransactionSynchronization#beforeCommit(boolean)
  5. */
  6. BEFORE_COMMIT,
  7. /**
  8. * Handle the event after the commit has completed successfully.
  9. * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and therefore
  10. * executes in the same sequence of events as {@code AFTER_COMPLETION}
  11. * (and not in {@link TransactionSynchronization#afterCommit()}).
  12. * <p>Interactions with the underlying transactional resource will not be
  13. * committed in this phase. See
  14. * {@link TransactionSynchronization#afterCompletion(int)} for details.
  15. * @see TransactionSynchronization#afterCompletion(int)
  16. * @see TransactionSynchronization#STATUS_COMMITTED
  17. */
  18. AFTER_COMMIT,
  19. /**
  20. * Handle the event if the transaction has rolled back.
  21. * <p>Note: This is a specialization of {@link #AFTER_COMPLETION} and therefore
  22. * executes in the same sequence of events as {@code AFTER_COMPLETION}.
  23. * <p>Interactions with the underlying transactional resource will not be
  24. * committed in this phase. See
  25. * {@link TransactionSynchronization#afterCompletion(int)} for details.
  26. * @see TransactionSynchronization#afterCompletion(int)
  27. * @see TransactionSynchronization#STATUS_ROLLED_BACK
  28. */
  29. AFTER_ROLLBACK,
  30. /**
  31. * Handle the event after the transaction has completed.
  32. * <p>For more fine-grained events, use {@link #AFTER_COMMIT} or
  33. * {@link #AFTER_ROLLBACK} to intercept transaction commit
  34. * or rollback, respectively.
  35. * <p>Interactions with the underlying transactional resource will not be
  36. * committed in this phase. See
  37. * {@link TransactionSynchronization#afterCompletion(int)} for details.
  38. * @see TransactionSynchronization#afterCompletion(int)
  39. */
  40. AFTER_COMPLETION
  41. }

5.1、基本使用

在含有事务的方法里发布事件:

  1. @Transactional(rollbackFor = Exception.class)
  2. public void test(){
  3. DamIllegalDataAudit audit = new DamIllegalDataAudit();
  4. audit.setId("1726931543097610240");
  5. audit.setRemark("xxx");
  6. this.baseMapper.updateById(audit);
  7. DamIllegalDataDto build = DamIllegalDataDto.builder().illegalData("11111").source("2222").functionDesc("数据清理中错误了").functionName("333").build();
  8. applicationEventPublisher.publishEvent(new DamIllegalDataEvent(Collections.singletonList(build)));
  9. }

定义感知事务监听:

  1. @Component
  2. public class TransactionalEventProcess {
  3. @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
  4. public void afterCommit(DamIllegalDataEvent event){
  5. System.out.println("事务提交后事件处理");
  6. }
  7. @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
  8. public void afterRollback(DamIllegalDataEvent event){
  9. System.out.println("事务回滚后事件处理");
  10. }
  11. }

当执行事务方法时候,可以发现:

注意:

如果事件自定义了ApplicationEventMulticaster,让事件变成异步,那么该感知事务会失效。

但是如果使用@Async或手动定义了 异步线程池ThreadUtil.execAsync还是可以生效的。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/198151
推荐阅读
相关标签
  

闽ICP备14008679号