赞
踩
@RocketMQMessageListener事务消息监听器
/**
*控制消费模式,您可以选择并发或有序接收消息。
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* 控制消息模式,
* 广播模式:所有消费者都能接受到消息
* 集群模式:无论有多少个消费者,只有一个消费者能够接收到消息。
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* 控制可以选择哪个消息
*/
String selectorExpression() default "*";
// 同步顺序消息 根据下面使用方式使用即可,在主题上面拼接
rocketMQTemplate.syncSendOrderly("MQ_TOPIC:user", info);
/**
*
*概念:消费者组(多个消费者) 此参数相同即为同一个消费者组
*作用:集群模式负载均衡的实现,广播模式的通知的实现
*
*/
String consumerGroup();
/**
* Topic name. 主题
* 指该消费者组所订阅的消息服务
*
*/
String topic();
//consumeMode = ConsumeMode.ORDERLY,messageModel = MessageModel.BROADCASTING,这样会报 messageModel BROADCASTING does not support ORDERLY message! @Service("MqConsumer") //@RocketMQMessageListener事务消息监听器 //consumeMode 消费模式,默认值 ConsumeMode.CONCURRENTLY 并行处理;ConsumeMode.ORDERLY 按顺序处理 //messageModel 消息模型,默认值 MessageModel.CLUSTERING 集群;MessageModel.BROADCASTING 广播 @RocketMQMessageListener(consumerGroup = "testmq2222", topic = "MQ_TOPIC", //selectorType = SelectorType.TAG, // selectorExpression = "user", consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING) public class MqConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener { int i = 0; //消息接收处理方法 @Override public void onMessage(MessageExt message) { String msg = null; try { msg = new String(message.getBody(), "utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.err.println("MqConsumer1111111111 接收到消息:tag:" + message.getTags() + " count:" + (i++) + " QueueId:" + message.getQueueId() + " 消息[body]:" + msg); } // 该方法重写消息监听器的属性 @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { // 设置消费者消息重试次数 defaultMQPushConsumer.setMaxReconsumeTimes(3); // 设置实例名称 defaultMQPushConsumer.setInstanceName("mqconsumer1"); }
注意:顺序处理不能和广播模式同时使用,应该广播模式是属于并发的,而顺序是强调FIFO原则,广播模式不能保证顺序一致性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。