当前位置:   article > 正文

Spring Rocketmq 事务消息 @RocketMQMessageListener注解的使用

@rocketmqmessagelistener


1、 RocketMQMessageListener参数讲解

@RocketMQMessageListener事务消息监听器

2、参数一 :consumeMode

    /**
	 *控制消费模式,您可以选择并发或有序接收消息。
     */
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
  • 1
  • 2
  • 3
  • 4

3、参数二:messageModel


    /**
	  * 控制消息模式,
	  * 广播模式:所有消费者都能接受到消息
	  * 集群模式:无论有多少个消费者,只有一个消费者能够接收到消息。
      */
    MessageModel messageModel() default MessageModel.CLUSTERING;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3、参数三:selectorExpression

    /**
     * 控制可以选择哪个消息
     */
    String selectorExpression() default "*";
 

 	// 同步顺序消息 根据下面使用方式使用即可,在主题上面拼接
    rocketMQTemplate.syncSendOrderly("MQ_TOPIC:user", info);
     
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4、参数四:consumerGroup

    /**
      *
      *概念:消费者组(多个消费者) 此参数相同即为同一个消费者组
      *作用:集群模式负载均衡的实现,广播模式的通知的实现
      *
      */
    String consumerGroup();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

5、参数:topic

    /**
     * Topic name. 主题
     * 指该消费者组所订阅的消息服务
     * 
     */
    String topic();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

6、代码示例

//consumeMode = ConsumeMode.ORDERLY,messageModel = MessageModel.BROADCASTING,这样会报 messageModel BROADCASTING does not support ORDERLY message!
@Service("MqConsumer")
//@RocketMQMessageListener事务消息监听器
//consumeMode 消费模式,默认值 ConsumeMode.CONCURRENTLY 并行处理;ConsumeMode.ORDERLY 按顺序处理
//messageModel 消息模型,默认值 MessageModel.CLUSTERING 集群;MessageModel.BROADCASTING 广播
@RocketMQMessageListener(consumerGroup = "testmq2222", topic = "MQ_TOPIC",
        //selectorType = SelectorType.TAG,
//        selectorExpression = "user",
        consumeMode = ConsumeMode.ORDERLY,
        messageModel = MessageModel.CLUSTERING)
public class MqConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
    int i = 0;

    //消息接收处理方法
    @Override
    public void onMessage(MessageExt message) {
        String msg = null;
        try {
            msg = new String(message.getBody(), "utf-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        System.err.println("MqConsumer1111111111 接收到消息:tag:" + message.getTags() + "   count:" + (i++)
                + "  QueueId:" + message.getQueueId() + " 消息[body]:" + msg);
    }


    //    该方法重写消息监听器的属性
      @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        // 设置消费者消息重试次数
        defaultMQPushConsumer.setMaxReconsumeTimes(3);
        //        设置实例名称
        defaultMQPushConsumer.setInstanceName("mqconsumer1");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

注意:顺序处理不能和广播模式同时使用,应该广播模式是属于并发的,而顺序是强调FIFO原则,广播模式不能保证顺序一致性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/603665
推荐阅读
相关标签
  

闽ICP备14008679号