当前位置:   article > 正文

Rocket MQ更优雅的实现消息发送与消息监听—基于Spring注解实现消息监听_rocketmq springboot 定时发送

rocketmq springboot 定时发送

如果项目使用Spring boot集成Rocket MQ可以使用RocketMQTemplate,代码看起来很简洁,但是如果项目只使用了spring,就要自己手写一堆代码去实现消息发送与消息监听

下面就参考RocketMQTemplate自己实现一个基于注解的消息监听框架

先看一下RocketMQTemplate是怎么实现注解方式注册消息监听的

  1. @RocketMQMessageListener(
  2. topic = "test_topic", //topic:和消费者发送的topic相同
  3. consumerGroup = "test_my-consumer", //group:不用和生产者group相同
  4. selectorExpression = "*") //tag
  5. @Component //必须注入spring容器
  6. //泛型必须和接收的消息类型相同
  7. public class TestListner implements RocketMQListener<User> {
  8. @Override
  9. public void onMessage(User user) {
  10. System.out.println(user);
  11. }
  12. }

接下来我们就自己实现@RocketMQMessageListener,我们为了区别就将注解命名为@RocketMqMsgListener

思路:

1.定义@RocketMqMsgListener

2.项目启动后扫描所有@RocketMqMsgListener注解的类

3.根据注解中的参数自动创建消费者

4.服务停止时自动停止消费者

 

1.定义@RocketMqMsgListener

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RocketMqMsgListener {
  5. /**
  6. * 订阅主题
  7. * @return
  8. */
  9. String topic();
  10. /**
  11. * 消费者组
  12. * @return
  13. */
  14. String consumerGroup();
  15. /**
  16. * tag,订阅子查询条件,默认获取所有
  17. * @return
  18. */
  19. String selectorExpression() default "*";
  20. /**
  21. * 查询类型 SelectorType.TAG根据tag查询 SelectorType.SQL92根据sql查询
  22. * @return
  23. */
  24. SelectorType selectorType() default SelectorType.TAG;
  25. /**
  26. * 控制消息模式,如果希望所有订阅者都接收消息全部消息,广播是一个不错的选择。如果希望一个消费者接收则使用负载均衡模式
  27. * @return
  28. */
  29. MessageModel messageModel() default MessageModel.CLUSTERING;
  30. /**
  31. * 实例名称
  32. * @return
  33. */
  34. String instanceName() default "";
  35. }

2.项目启动后扫描所有@RocketMqMsgListener注解的类

3.根据注解中的参数自动创建消费者

使用spring自带扫描,类实现ApplicationListener<ContextRefreshedEvent>会在容器启动后自动执行其中的onApplicationEvent(ContextRefreshedEvent event)方法

  1. public class MqMsgLoadListener implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
  2. //扫描的类
  3. private Map<String, Object> beanMap = new HashMap<>();
  4. //消费者列表
  5. private List<SimpleConsumer> consumerList = new ArrayList<>();
  6. @Autowired
  7. private RocketConfig config;
  8. @Override
  9. public void onApplicationEvent(ContextRefreshedEvent event) {
  10. //如果是第二次加载则不处理了
  11. if(event.getApplicationContext().getParent() != null) {
  12. return;
  13. }
  14. //获取注解类
  15. beanMap = event.getApplicationContext().getBeansWithAnnotation(RocketMqMsgListener.class);
  16. System.out.println("================"+beanMap.size());
  17. //未扫描到不处理
  18. if(beanMap == null || beanMap.size() == 0) {
  19. return;
  20. }
  21. for(Object bean : beanMap.values()) {
  22. createConsumer(bean);
  23. }
  24. }
  25. /**
  26. * 创建消费者
  27. * @param bean
  28. */
  29. private void createConsumer(Object bean) {
  30. if(!(bean instanceof MessageListenerConcurrently)) {
  31. return;
  32. }
  33. //获取注解
  34. RocketMqMsgListener mqMsgListener = bean.getClass().getAnnotation(RocketMqMsgListener.class);
  35. //配置
  36. ConsumerConfig consumerConfig = new ConsumerConfig();
  37. consumerConfig.setConsumeFromWhere(mqMsgListener.consumeFromWhere());
  38. consumerConfig.setTopic(mqMsgListener.topic());
  39. consumerConfig.setConsumerGroup(mqMsgListener.consumerGroup());
  40. consumerConfig.setSelectorExpression(mqMsgListener.selectorExpression());
  41. consumerConfig.setSelectorType(mqMsgListener.selectorType());
  42. consumerConfig.setMessageModel(mqMsgListener.messageModel());
  43. consumerConfig.setConsumeMode(mqMsgListener.consumeMode());
  44. if(!StringUtils.isEmpty(mqMsgListener.instanceName())) {
  45. consumerConfig.setInstanceName(mqMsgListener.instanceName());
  46. }
  47. //创建消费者
  48. SimpleConsumer consumer = new SimpleConsumer(config, consumerConfig, (MessageListenerConcurrently)bean);
  49. consumerList.add(consumer);
  50. //初始化并启动
  51. try {
  52. consumer.init();
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. /**
  58. * 服务停止
  59. */
  60. @Override
  61. public void destroy() throws Exception {
  62. for(SimpleConsumer consumer : consumerList) {
  63. if(consumer == null) {
  64. continue;
  65. }
  66. consumer.destroy();
  67. }
  68. }
  69. }

xml中配置

  1. <!-- 启动监听 -->
  2. <bean id="mqMsgLoadListener" class="com.tes.rocket.annotation.listener.MqMsgLoadListener" />

4.服务停止时自动停止消费者

实现DisposableBean接口,服务停止时会自动调用destroy()方法,在destroy()释放消费者资源

  1. /**
  2. * 服务停止
  3. */
  4. @Override
  5. public void destroy() throws Exception {
  6. for(SimpleConsumer consumer : consumerList) {
  7. if(consumer == null) {
  8. continue;
  9. }
  10. consumer.destroy();
  11. }
  12. }

 

使用注解

  1. @Component
  2. @RocketMqMsgListener(topic = "test_topic", consumerGroup = "rocketmq-test")
  3. public class StringMessageListener implements MessageListenerConcurrently {
  4. private int total;
  5. @Override
  6. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  7. setTotal();
  8. for (MessageExt msg : msgs) {
  9. System.out.println("========收到消息:"+new String(msg.getBody())+"---------总数:"+total);
  10. }
  11. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  12. }
  13. public synchronized void setTotal() {
  14. total++;
  15. }
  16. public int getTotal(){
  17. return total;
  18. }
  19. }

 

完整代码

配置文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/context
  8. http://www.springframework.org/schema/context/spring-context.xsd ">
  9. <!-- rocket mq配置 -->
  10. <bean id="rocketMQConfig" class="com.test.config.RocketConfig">
  11. <constructor-arg name="address" value="192.168.1.113:9876"/>
  12. </bean>
  13. <!-- 默认生产者 -->
  14. <bean id="producer" class="com.test.rocket.producer.SimpleProducer" init-method="init" destroy-method="destroy">
  15. <constructor-arg name="producerGroup" value="rocketmq-test" />
  16. <constructor-arg name="config" ref="rocketMQConfig"/>
  17. </bean>
  18. <!-- 扫描包 -->
  19. <context:component-scan base-package="com.test.**.mq.**" />
  20. <!-- 启动监听 -->
  21. <bean id="mqMsgLoadListener" class="com.test.rocket.annotation.listener.MqMsgLoadListener" />
  22. </beans>

RocketMqMsgListener

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RocketMqMsgListener {
  5. /**
  6. * 订阅主题
  7. * @return
  8. */
  9. String topic();
  10. /**
  11. * 消费者组
  12. * @return
  13. */
  14. String consumerGroup();
  15. /**
  16. * tag,订阅子查询条件,默认获取所有
  17. * @return
  18. */
  19. String selectorExpression() default "*";
  20. /**
  21. * 查询类型 SelectorType.TAG根据tag查询 SelectorType.SQL92根据sql查询
  22. * @return
  23. */
  24. SelectorType selectorType() default SelectorType.TAG;
  25. /**
  26. * 控制消息模式,如果希望所有订阅者都接收消息全部消息,广播是一个不错的选择。如果希望一个消费者接收则使用负载均衡模式
  27. * @return
  28. */
  29. MessageModel messageModel() default MessageModel.CLUSTERING;
  30. /**
  31. * 实例名称
  32. * @return
  33. */
  34. String instanceName() default "";
  35. }

RocketConfig

  1. public class RocketConfig implements Serializable {
  2. private static final long serialVersionUID = 6807425160034094787L;
  3. public RocketConfig() {
  4. }
  5. /**
  6. * 创建配置对象
  7. * @param address 服务地址,多个用;分割,例如"192.168.25.135:9876;192.168.25.138:9876"
  8. */
  9. public RocketConfig(String address) {
  10. this.address = address;
  11. }
  12. //地址
  13. private String address;
  14. public String getAddress() {
  15. return address;
  16. }
  17. public void setAddress(String address) {
  18. this.address = address;
  19. }
  20. }

ConsumerConfig

  1. public class ConsumerConfig implements Serializable {
  2. private static final long serialVersionUID = 10022583900545392332L;
  3. /**
  4. * 订阅主题
  5. * @return
  6. */
  7. private String topic;
  8. /**
  9. * 消费者组
  10. * @return
  11. */
  12. private String consumerGroup;
  13. /**
  14. * 默认是tag,订阅子查询条件,默认获取所有
  15. * 如果selectorType为sql,则值为sql
  16. * @return
  17. */
  18. private String selectorExpression = "*";
  19. /**
  20. * 查询类型 SelectorType.TAG根据tag查询 SelectorType.SQL92根据sql查询
  21. * @return
  22. */
  23. private SelectorType selectorType;
  24. /**
  25. * 控制消息模式,如果希望所有订阅者都接收消息全部消息,广播是一个不错的选择。如果希望一个消费者接收则使用负载均衡模式
  26. * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
  27. * @return
  28. */
  29. private MessageModel messageModel;
  30. /**
  31. * 实例名称
  32. * @return
  33. */
  34. private String instanceName;
  35. public String getTopic() {
  36. return topic;
  37. }
  38. public void setTopic(String topic) {
  39. this.topic = topic;
  40. }
  41. public String getConsumerGroup() {
  42. return consumerGroup;
  43. }
  44. public void setConsumerGroup(String consumerGroup) {
  45. this.consumerGroup = consumerGroup;
  46. }
  47. public String getSelectorExpression() {
  48. return selectorExpression;
  49. }
  50. public void setSelectorExpression(String selectorExpression) {
  51. this.selectorExpression = selectorExpression;
  52. }
  53. public SelectorType getSelectorType() {
  54. return selectorType;
  55. }
  56. public void setSelectorType(SelectorType selectorType) {
  57. this.selectorType = selectorType;
  58. }
  59. public MessageModel getMessageModel() {
  60. return messageModel;
  61. }
  62. public void setConsumeMode(ConsumeMode consumeMode) {
  63. this.consumeMode = consumeMode;
  64. }
  65. public String getInstanceName() {
  66. return instanceName;
  67. }
  68. public void setInstanceName(String instanceName) {
  69. this.instanceName = instanceName;
  70. }
  71. }

SelectorType

  1. public enum SelectorType {
  2. /**
  3. * @see 根据tag查询
  4. */
  5. TAG,
  6. /**
  7. * @see 根据sql查询
  8. */
  9. SQL92
  10. }

SimpleProducer

  1. public class SimpleProducer {
  2. private String producerGroup;
  3. private RocketConfig config;
  4. private String instanceName;
  5. private DefaultMQProducer producer;
  6. public DefaultMQProducer getProducer() {
  7. return producer;
  8. }
  9. public SimpleProducer(RocketConfig config, String producerGroup){
  10. this.producerGroup = producerGroup;
  11. this.config = config;
  12. }
  13. public void init() throws MQClientException{
  14. producer = new DefaultMQProducer(producerGroup);
  15. producer.setNamesrvAddr(config.getAddress());
  16. if(instanceName != null) {
  17. producer.setInstanceName(instanceName);
  18. }
  19. producer.start();
  20. }
  21. public void destroy(){
  22. producer.shutdown();
  23. }
  24. public String getInstanceName() {
  25. return instanceName;
  26. }
  27. public void setInstanceName(String instanceName) {
  28. this.instanceName = instanceName;
  29. }
  30. }

SimpleConsumer

  1. public class SimpleConsumer {
  2. private DefaultMQPushConsumer consumer;
  3. //rocketMq配置
  4. private RocketConfig config;
  5. //消费者配置
  6. private ConsumerConfig consumerConfig;
  7. //消息监听器
  8. private MessageListenerConcurrently messageListener;
  9. public SimpleConsumer(RocketConfig config, ConsumerConfig consumerConfig, MessageListenerConcurrently messageListener){
  10. this.messageListener = messageListener;
  11. this.config = config;
  12. this.consumerConfig = consumerConfig;
  13. }
  14. public void init() throws Exception{
  15. consumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());
  16. //服务地址
  17. consumer.setNamesrvAddr(config.getAddress());
  18. //实例名称
  19. if(consumerConfig.getInstanceName() != null) {
  20. consumer.setInstanceName(consumerConfig.getInstanceName());
  21. }
  22. //订阅主题
  23. if(consumerConfig.getSelectorType() == SelectorType.TAG) {//如果是根据tag过滤消息
  24. consumer.subscribe(consumerConfig.getTopic(), consumerConfig.getSelectorExpression());
  25. }else if(consumerConfig.getSelectorType() == SelectorType.SQL92) {//如果是根据sql过滤消息
  26. consumer.subscribe(consumerConfig.getTopic(), MessageSelector.bySql(consumerConfig.getSelectorExpression()));
  27. }
  28. //消息模式
  29. consumer.setMessageModel(consumerConfig.getMessageModel());
  30. //监听器
  31. consumer.registerMessageListener(messageListener);
  32. consumer.start();
  33. }
  34. public void destroy(){
  35. consumer.shutdown();
  36. }
  37. public DefaultMQPushConsumer getConsumer() {
  38. return consumer;
  39. }
  40. }

MqMsgLoadListener

  1. public class MqMsgLoadListener implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
  2. //扫描的类
  3. private Map<String, Object> beanMap = new HashMap<>();
  4. //消费者列表
  5. private List<SimpleConsumer> consumerList = new ArrayList<>();
  6. @Autowired
  7. private RocketConfig config;
  8. @Override
  9. public void onApplicationEvent(ContextRefreshedEvent event) {
  10. //如果是第二次加载则不处理了
  11. if(event.getApplicationContext().getParent() != null) {
  12. return;
  13. }
  14. //获取注解类
  15. beanMap = event.getApplicationContext().getBeansWithAnnotation(RocketMqMsgListener.class);
  16. System.out.println("================"+beanMap.size());
  17. //未扫描到不处理
  18. if(beanMap == null || beanMap.size() == 0) {
  19. return;
  20. }
  21. for(Object bean : beanMap.values()) {
  22. createConsumer(bean);
  23. }
  24. }
  25. /**
  26. * 创建消费者
  27. * @param bean
  28. */
  29. private void createConsumer(Object bean) {
  30. if(!(bean instanceof MessageListenerConcurrently)) {
  31. return;
  32. }
  33. //获取注解
  34. RocketMqMsgListener mqMsgListener = bean.getClass().getAnnotation(RocketMqMsgListener.class);
  35. //配置
  36. ConsumerConfig consumerConfig = new ConsumerConfig();
  37. consumerConfig.setTopic(mqMsgListener.topic());
  38. consumerConfig.setConsumerGroup(mqMsgListener.consumerGroup());
  39. consumerConfig.setSelectorExpression(mqMsgListener.selectorExpression());
  40. consumerConfig.setSelectorType(mqMsgListener.selectorType());
  41. consumerConfig.setMessageModel(mqMsgListener.messageModel());
  42. if(!StringUtils.isEmpty(mqMsgListener.instanceName())) {
  43. consumerConfig.setInstanceName(mqMsgListener.instanceName());
  44. }
  45. //创建消费者
  46. SimpleConsumer consumer = new SimpleConsumer(config, consumerConfig, (MessageListenerConcurrently)bean);
  47. consumerList.add(consumer);
  48. //初始化并启动
  49. try {
  50. consumer.init();
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. /**
  56. * 服务停止
  57. */
  58. @Override
  59. public void destroy() throws Exception {
  60. for(SimpleConsumer consumer : consumerList) {
  61. if(consumer == null) {
  62. continue;
  63. }
  64. consumer.destroy();
  65. }
  66. }
  67. }

示例代码抛砖引玉,要用于生产环境还需要完善

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/603658
推荐阅读
相关标签
  

闽ICP备14008679号