当前位置:   article > 正文

springboot+redis自定义注解实现发布订阅_redismessagelistenercontainer 注解

redismessagelistenercontainer 注解

前言

最近开发了一个内部消息组件,逻辑大体是通过定义注解 @MessageHub,在启动时扫描全部bean中有使用了该注解的方法后台创建一个常驻线程代理消费数据,当线程消费到数据就回写到对应加了注解的方法里。

  1. less复制代码@Slf4j
  2. @Service
  3. public class RedisConsumerDemo {
  4. @MessageHub(topic = "${uptown.topic}", type = "REDIS_PUBSUB")
  5. public void consumer(Object message) {
  6. log.info("pubsub info {} ", message);
  7. }
  8. }

实现redis的队列、stream方式实现都很简单,唯独发布订阅方式,网上的demo全都是一个固定套路,通过redis容器注入监听器,而且回写非常死板。

常规写法

常规实现reids的发布订阅模式写法一共三步

  1. 创建消息监听器
  1. typescript复制代码@Bean
  2. public MessageListenerAdapter smsExpirationListener(TestSubscriber messageListener) {
  3. return new MessageListenerAdapter(messageListener, "onMessage");
  4. }
  1. 创建订阅器
  1. java复制代码
  2. @Component
  3. public class TestSubscriber implements MessageListener {
  4. @Override
  5. public void onMessage(Message message, byte[] pattern) {
  6. log.info("get data :{}", msg);
  7. }
  8. }
  1. 向redis容器中添加消息监听器
  1. java复制代码@Configuration
  2. public class RedisConfig {
  3. @Bean
  4. public RedisMessageListenerContainer container(
  5. RedisConnectionFactory redisConnectionFactory,
  6. MessageListenerAdapter smsExpirationListener) {
  7. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  8. container.setConnectionFactory(redisConnectionFactory);
  9. container.addMessageListener(smsExpirationListener, new PatternTopic("test"));
  10. return container;
  11. }
  12. }

这样定义非常简单明了,但是有个问题是太代码僵硬了,创建监听者很不灵活,只能指定内部的onMessage方法,那么怎么才能融入到我们的内部消息流转中间件里呢。

自定义注解实现

我们内部组件抽象了两个方法,生产和消费,但这两个方法逻辑截然不同,生产方法是暴露给serverice层接口调用,调用方在调用生产方法后能直接知道生产了几条数据和成功与否。而消费方法是配合Spring生命周期函数服务启动时建立常驻消费线程的。

  1. scss复制代码
  2. /**
  3. * 生产消息
  4. */
  5. Integer producer(MessageForm messageForm);
  6. /**
  7. * 消费消息
  8. */
  9. void consumer(ConsumerAdapterForm adapterForm);

生产消息当然很容易实现,只需要调用已经封装好的convertAndSend方法。

less复制代码stringRedisTemplate.convertAndSend(messageForm.getTopic(), messageForm.getMessage());

消费方法就有说法了,动态生成监听者的场景下使用redis容器用代码挨个注册已经满足不了了,但仔细过一遍源代码就会发现,监听类的构造方法的入参只有两个,第一个需要回调的代理类,第二个消费到数据后回调的方法。

  1. vbnet复制代码/**
  2. * Create a new {@link MessageListenerAdapter} for the given delegate.
  3. *
  4. * @param delegate the delegate object
  5. * @param defaultListenerMethod method to call when a message comes
  6. * @see #getListenerMethodName
  7. */
  8. public MessageListenerAdapter(Object delegate, String defaultListenerMethod) {
  9. this(delegate);
  10. setDefaultListenerMethod(defaultListenerMethod);
  11. }

那么好了好了,方案有了,本质上就是把RedisMessageListenerContainer注入进来之后,扫描项目里所有加了 @MessageHub 的bean,包装成监听类加载到容器里就完事了。怎么扫描的代码就不再赘述了,实现Spring的生命周期函数BeanPostProcessor#postProcessAfterInitialization,在这里用AnnotationUtils判断是否标注了注解。

  1. ini复制代码MessageHub annotation = AnnotationUtils.findAnnotation(method, MessageHub.class);
  2. if (annotation == null) {
  3. continue;
  4. }

标注了后判断如果是发布订阅,进入发布订阅的实现类。

  1. java复制代码@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS)
  2. @Service("redisPubSubProcessor")
  3. public class RedisPubSubProcessor extends MessageHubServiceImpl {
  4. @Resource
  5. RedisMessageListenerContainer redisPubSubContainer;
  6. @Override
  7. public void produce(ProducerAdapterForm producerAdapterForm) {
  8. stringRedisTemplate.convertAndSend(producerAdapterForm.getTopic(), producerAdapterForm.getMessage());
  9. }
  10. @Override
  11. public void consume(ConsumerAdapterForm messageForm) {
  12. MessageListenerAdapter adapter = new MessageListenerAdapter(messageForm.getBean(), messageForm.getInvokeMethod().getName());
  13. adapter.afterPropertiesSet();
  14. redisPubSubContainer.addMessageListener(adapter, new PatternTopic(messageForm.getTopic()));
  15. }
  16. @Bean
  17. public RedisMessageListenerContainer redisPubSubContainer(RedisConnectionFactory connectionFactory) {
  18. RedisMessageListenerContainer container = new RedisMessageListenerContainer();
  19. container.setConnectionFactory(connectionFactory);
  20. return container;
  21. }
  22. }

首先先将RedisMessageListenerContainer注入到Spring容器里,produce方法只需要调用下现程的api。consume方法由于上一步我们获取了bean和对应的method,直接用MessageListenerAdapter的构造器创建出监听器来,这里有个坑,需要手动调用adapter.afterPropertiesSet()设置一些必要的属性,这个在常规写法里框架帮忙做了。如果不调用的话会出一些空指针之类的bug。

随后把监听器add到容器就实现了方法代理,背后的线程监听到数据会回调到标注了 @MessageHub 的方法里

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/754539
推荐阅读
相关标签
  

闽ICP备14008679号