赞
踩
如果项目使用Spring boot集成Rocket MQ可以使用RocketMQTemplate,代码看起来很简洁,但是如果项目只使用了spring,就要自己手写一堆代码去实现消息发送与消息监听
下面就参考RocketMQTemplate自己实现一个基于注解的消息监听框架
先看一下RocketMQTemplate是怎么实现注解方式注册消息监听的
- @RocketMQMessageListener(
- topic = "test_topic", //topic:和消费者发送的topic相同
- consumerGroup = "test_my-consumer", //group:不用和生产者group相同
- selectorExpression = "*") //tag
- @Component //必须注入spring容器
- //泛型必须和接收的消息类型相同
- public class TestListner implements RocketMQListener<User> {
-
- @Override
- public void onMessage(User user) {
- System.out.println(user);
- }
- }
接下来我们就自己实现@RocketMQMessageListener,我们为了区别就将注解命名为@RocketMqMsgListener
思路:
1.定义@RocketMqMsgListener
2.项目启动后扫描所有@RocketMqMsgListener注解的类
3.根据注解中的参数自动创建消费者
4.服务停止时自动停止消费者
1.定义@RocketMqMsgListener
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface RocketMqMsgListener {
-
- /**
- * 订阅主题
- * @return
- */
- String topic();
-
- /**
- * 消费者组
- * @return
- */
- String consumerGroup();
-
- /**
- * tag,订阅子查询条件,默认获取所有
- * @return
- */
- String selectorExpression() default "*";
-
- /**
- * 查询类型 SelectorType.TAG根据tag查询 SelectorType.SQL92根据sql查询
- * @return
- */
- SelectorType selectorType() default SelectorType.TAG;
-
- /**
- * 控制消息模式,如果希望所有订阅者都接收消息全部消息,广播是一个不错的选择。如果希望一个消费者接收则使用负载均衡模式
- * @return
- */
- MessageModel messageModel() default MessageModel.CLUSTERING;
-
- /**
- * 实例名称
- * @return
- */
- String instanceName() default "";
- }
2.项目启动后扫描所有@RocketMqMsgListener注解的类
3.根据注解中的参数自动创建消费者
使用spring自带扫描,类实现ApplicationListener<ContextRefreshedEvent>会在容器启动后自动执行其中的onApplicationEvent(ContextRefreshedEvent event)方法
- public class MqMsgLoadListener implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
- //扫描的类
- private Map<String, Object> beanMap = new HashMap<>();
- //消费者列表
- private List<SimpleConsumer> consumerList = new ArrayList<>();
- @Autowired
- private RocketConfig config;
-
- @Override
- public void onApplicationEvent(ContextRefreshedEvent event) {
- //如果是第二次加载则不处理了
- if(event.getApplicationContext().getParent() != null) {
- return;
- }
-
- //获取注解类
- beanMap = event.getApplicationContext().getBeansWithAnnotation(RocketMqMsgListener.class);
- System.out.println("================"+beanMap.size());
- //未扫描到不处理
- if(beanMap == null || beanMap.size() == 0) {
- return;
- }
- for(Object bean : beanMap.values()) {
- createConsumer(bean);
- }
- }
-
- /**
- * 创建消费者
- * @param bean
- */
- private void createConsumer(Object bean) {
- if(!(bean instanceof MessageListenerConcurrently)) {
- return;
- }
- //获取注解
- RocketMqMsgListener mqMsgListener = bean.getClass().getAnnotation(RocketMqMsgListener.class);
- //配置
- ConsumerConfig consumerConfig = new ConsumerConfig();
- consumerConfig.setConsumeFromWhere(mqMsgListener.consumeFromWhere());
- consumerConfig.setTopic(mqMsgListener.topic());
- consumerConfig.setConsumerGroup(mqMsgListener.consumerGroup());
- consumerConfig.setSelectorExpression(mqMsgListener.selectorExpression());
- consumerConfig.setSelectorType(mqMsgListener.selectorType());
- consumerConfig.setMessageModel(mqMsgListener.messageModel());
- consumerConfig.setConsumeMode(mqMsgListener.consumeMode());
- if(!StringUtils.isEmpty(mqMsgListener.instanceName())) {
- consumerConfig.setInstanceName(mqMsgListener.instanceName());
- }
- //创建消费者
- SimpleConsumer consumer = new SimpleConsumer(config, consumerConfig, (MessageListenerConcurrently)bean);
- consumerList.add(consumer);
- //初始化并启动
- try {
- consumer.init();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 服务停止
- */
- @Override
- public void destroy() throws Exception {
- for(SimpleConsumer consumer : consumerList) {
- if(consumer == null) {
- continue;
- }
- consumer.destroy();
- }
- }
- }
xml中配置
-
- <!-- 启动监听 -->
- <bean id="mqMsgLoadListener" class="com.tes.rocket.annotation.listener.MqMsgLoadListener" />
4.服务停止时自动停止消费者
实现DisposableBean接口,服务停止时会自动调用destroy()方法,在destroy()释放消费者资源
- /**
- * 服务停止
- */
- @Override
- public void destroy() throws Exception {
- for(SimpleConsumer consumer : consumerList) {
- if(consumer == null) {
- continue;
- }
- consumer.destroy();
- }
- }
使用注解
- @Component
- @RocketMqMsgListener(topic = "test_topic", consumerGroup = "rocketmq-test")
- public class StringMessageListener implements MessageListenerConcurrently {
- private int total;
-
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- setTotal();
- for (MessageExt msg : msgs) {
- System.out.println("========收到消息:"+new String(msg.getBody())+"---------总数:"+total);
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- public synchronized void setTotal() {
- total++;
- }
-
- public int getTotal(){
- return total;
- }
- }
完整代码
配置文件
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans
- http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/context
- http://www.springframework.org/schema/context/spring-context.xsd ">
-
- <!-- rocket mq配置 -->
- <bean id="rocketMQConfig" class="com.test.config.RocketConfig">
- <constructor-arg name="address" value="192.168.1.113:9876"/>
- </bean>
-
- <!-- 默认生产者 -->
- <bean id="producer" class="com.test.rocket.producer.SimpleProducer" init-method="init" destroy-method="destroy">
- <constructor-arg name="producerGroup" value="rocketmq-test" />
- <constructor-arg name="config" ref="rocketMQConfig"/>
- </bean>
-
- <!-- 扫描包 -->
- <context:component-scan base-package="com.test.**.mq.**" />
- <!-- 启动监听 -->
- <bean id="mqMsgLoadListener" class="com.test.rocket.annotation.listener.MqMsgLoadListener" />
- </beans>
RocketMqMsgListener
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface RocketMqMsgListener {
-
- /**
- * 订阅主题
- * @return
- */
- String topic();
-
- /**
- * 消费者组
- * @return
- */
- String consumerGroup();
-
- /**
- * tag,订阅子查询条件,默认获取所有
- * @return
- */
- String selectorExpression() default "*";
-
- /**
- * 查询类型 SelectorType.TAG根据tag查询 SelectorType.SQL92根据sql查询
- * @return
- */
- SelectorType selectorType() default SelectorType.TAG;
-
- /**
- * 控制消息模式,如果希望所有订阅者都接收消息全部消息,广播是一个不错的选择。如果希望一个消费者接收则使用负载均衡模式
- * @return
- */
- MessageModel messageModel() default MessageModel.CLUSTERING;
-
- /**
- * 实例名称
- * @return
- */
- String instanceName() default "";
- }
RocketConfig
- public class RocketConfig implements Serializable {
- private static final long serialVersionUID = 6807425160034094787L;
-
- public RocketConfig() {
-
- }
-
- /**
- * 创建配置对象
- * @param address 服务地址,多个用;分割,例如"192.168.25.135:9876;192.168.25.138:9876"
- */
- public RocketConfig(String address) {
- this.address = address;
- }
-
- //地址
- private String address;
-
- public String getAddress() {
- return address;
- }
-
- public void setAddress(String address) {
- this.address = address;
- }
- }
ConsumerConfig
- public class ConsumerConfig implements Serializable {
- private static final long serialVersionUID = 10022583900545392332L;
-
- /**
- * 订阅主题
- * @return
- */
- private String topic;
-
- /**
- * 消费者组
- * @return
- */
- private String consumerGroup;
-
- /**
- * 默认是tag,订阅子查询条件,默认获取所有
- * 如果selectorType为sql,则值为sql
- * @return
- */
- private String selectorExpression = "*";
-
- /**
- * 查询类型 SelectorType.TAG根据tag查询 SelectorType.SQL92根据sql查询
- * @return
- */
- private SelectorType selectorType;
-
- /**
- * 控制消息模式,如果希望所有订阅者都接收消息全部消息,广播是一个不错的选择。如果希望一个消费者接收则使用负载均衡模式
- * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
- * @return
- */
- private MessageModel messageModel;
-
- /**
- * 实例名称
- * @return
- */
- private String instanceName;
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
- public String getSelectorExpression() {
- return selectorExpression;
- }
-
- public void setSelectorExpression(String selectorExpression) {
- this.selectorExpression = selectorExpression;
- }
-
- public SelectorType getSelectorType() {
- return selectorType;
- }
-
- public void setSelectorType(SelectorType selectorType) {
- this.selectorType = selectorType;
- }
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
- public void setConsumeMode(ConsumeMode consumeMode) {
- this.consumeMode = consumeMode;
- }
-
- public String getInstanceName() {
- return instanceName;
- }
-
- public void setInstanceName(String instanceName) {
- this.instanceName = instanceName;
- }
- }
SelectorType
- public enum SelectorType {
-
- /**
- * @see 根据tag查询
- */
- TAG,
-
- /**
- * @see 根据sql查询
- */
- SQL92
- }
SimpleProducer
- public class SimpleProducer {
- private String producerGroup;
- private RocketConfig config;
-
- private String instanceName;
-
- private DefaultMQProducer producer;
-
- public DefaultMQProducer getProducer() {
- return producer;
- }
-
- public SimpleProducer(RocketConfig config, String producerGroup){
- this.producerGroup = producerGroup;
- this.config = config;
- }
-
- public void init() throws MQClientException{
- producer = new DefaultMQProducer(producerGroup);
- producer.setNamesrvAddr(config.getAddress());
- if(instanceName != null) {
- producer.setInstanceName(instanceName);
- }
- producer.start();
- }
-
- public void destroy(){
- producer.shutdown();
- }
-
- public String getInstanceName() {
- return instanceName;
- }
-
- public void setInstanceName(String instanceName) {
- this.instanceName = instanceName;
- }
- }
SimpleConsumer
- public class SimpleConsumer {
- private DefaultMQPushConsumer consumer;
- //rocketMq配置
- private RocketConfig config;
- //消费者配置
- private ConsumerConfig consumerConfig;
- //消息监听器
- private MessageListenerConcurrently messageListener;
-
- public SimpleConsumer(RocketConfig config, ConsumerConfig consumerConfig, MessageListenerConcurrently messageListener){
- this.messageListener = messageListener;
- this.config = config;
- this.consumerConfig = consumerConfig;
- }
-
- public void init() throws Exception{
- consumer = new DefaultMQPushConsumer(consumerConfig.getConsumerGroup());
- //服务地址
- consumer.setNamesrvAddr(config.getAddress());
- //实例名称
- if(consumerConfig.getInstanceName() != null) {
- consumer.setInstanceName(consumerConfig.getInstanceName());
- }
- //订阅主题
- if(consumerConfig.getSelectorType() == SelectorType.TAG) {//如果是根据tag过滤消息
- consumer.subscribe(consumerConfig.getTopic(), consumerConfig.getSelectorExpression());
- }else if(consumerConfig.getSelectorType() == SelectorType.SQL92) {//如果是根据sql过滤消息
- consumer.subscribe(consumerConfig.getTopic(), MessageSelector.bySql(consumerConfig.getSelectorExpression()));
- }
- //消息模式
- consumer.setMessageModel(consumerConfig.getMessageModel());
- //监听器
- consumer.registerMessageListener(messageListener);
- consumer.start();
- }
-
- public void destroy(){
- consumer.shutdown();
- }
-
- public DefaultMQPushConsumer getConsumer() {
- return consumer;
- }
- }
MqMsgLoadListener
- public class MqMsgLoadListener implements ApplicationListener<ContextRefreshedEvent>, DisposableBean {
- //扫描的类
- private Map<String, Object> beanMap = new HashMap<>();
- //消费者列表
- private List<SimpleConsumer> consumerList = new ArrayList<>();
- @Autowired
- private RocketConfig config;
-
- @Override
- public void onApplicationEvent(ContextRefreshedEvent event) {
- //如果是第二次加载则不处理了
- if(event.getApplicationContext().getParent() != null) {
- return;
- }
-
- //获取注解类
- beanMap = event.getApplicationContext().getBeansWithAnnotation(RocketMqMsgListener.class);
- System.out.println("================"+beanMap.size());
- //未扫描到不处理
- if(beanMap == null || beanMap.size() == 0) {
- return;
- }
- for(Object bean : beanMap.values()) {
- createConsumer(bean);
- }
- }
-
- /**
- * 创建消费者
- * @param bean
- */
- private void createConsumer(Object bean) {
- if(!(bean instanceof MessageListenerConcurrently)) {
- return;
- }
- //获取注解
- RocketMqMsgListener mqMsgListener = bean.getClass().getAnnotation(RocketMqMsgListener.class);
- //配置
- ConsumerConfig consumerConfig = new ConsumerConfig();
- consumerConfig.setTopic(mqMsgListener.topic());
- consumerConfig.setConsumerGroup(mqMsgListener.consumerGroup());
- consumerConfig.setSelectorExpression(mqMsgListener.selectorExpression());
- consumerConfig.setSelectorType(mqMsgListener.selectorType());
- consumerConfig.setMessageModel(mqMsgListener.messageModel());
- if(!StringUtils.isEmpty(mqMsgListener.instanceName())) {
- consumerConfig.setInstanceName(mqMsgListener.instanceName());
- }
- //创建消费者
- SimpleConsumer consumer = new SimpleConsumer(config, consumerConfig, (MessageListenerConcurrently)bean);
- consumerList.add(consumer);
- //初始化并启动
- try {
- consumer.init();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 服务停止
- */
- @Override
- public void destroy() throws Exception {
- for(SimpleConsumer consumer : consumerList) {
- if(consumer == null) {
- continue;
- }
- consumer.destroy();
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。