赞
踩
- <!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.7.1</version>
- </dependency>
消息发送
消息消费
- package com.zqh.com.rocketmq.baseMsg;
-
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendCallback;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.SendStatus;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
-
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.TimeUnit;
-
- /**
- * SEND_OK:消息发送成功
- * FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- * FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- * SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 Slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- */
- public class BaseMsgProducer {
-
- final static String NAMESRV_ADDR = "192.168.0.30:9876";
- final static String PRODUCER_GROUP = "base_group";
- final static String TOPIC = "base_topic";
- final static String TAGS = "base_tags";
-
- public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- sendSyncMsg();
- // sendASyncMsg();
- // sendOnewayMsg();
- }
-
- /**
- * 发送同步消息需要接到消息结果之后再发送下一个消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- */
- public static void sendSyncMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- // 创建生产者producer,并指定生产者组名
- DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_syncMsg");
- //指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- for (int i = 0; i < 1000; i++) {
- //创建消息对象,指定主题topic,tag和消息体
- Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes());
- //发送消息
- SendResult result = producer.send(message);
- //发送状态
- SendStatus sendStatus = result.getSendStatus();
- //消息ID
- String msgId = result.getMsgId();
- //消息队列ID
- int queueId = result.getMessageQueue().getQueueId();
- System.out.println("发送结果:" + result);
- }
- //6.- 关闭producer
- producer.shutdown();
- }
-
-
- /**
- * 异步消息不必等待返回结果,立即发送下一个消息,可以通过send(Message msg, SendCallback sendCallback)中的回调函数,对返回结果进行处理。异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
- */
- public static void sendASyncMsg() throws InterruptedException, RemotingException, MQClientException {
- DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_async");
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- final CountDownLatch countDownLatch = new CountDownLatch(10);
- for (int i = 0; i < 10; i++) {
- Message message = new Message(TOPIC, TAGS + "_async", System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes());
- producer.send(message, new SendCallback() {
- @Override
- public void onSuccess(SendResult sendResult) {
- countDownLatch.countDown();
- System.out.println("发送结果:" + sendResult);
- }
-
- @Override
- public void onException(Throwable throwable) {
- countDownLatch.countDown();
- System.out.println("异常结果:" + throwable.getMessage());
- }
- });
- }
-
- countDownLatch.await(5, TimeUnit.SECONDS);
- // 避免提前关闭了,所以等发送完再关闭
- producer.shutdown();
- }
-
- /**
- * 单向消息通俗来说,就是发送消息不必等待返回结果,也无需执行回调函数。这种方式主要用在不特别关心发送结果的场景,例如日志发送。
- */
- public static void sendOnewayMsg() throws InterruptedException, RemotingException, MQClientException {
- DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_oneway");
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- for (int i = 0; i < 10; i++) {
- Message message = new Message(TOPIC, TAGS + "_oneway", System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes());
- producer.sendOneway(message);
- System.out.println("发送完成");
- }
- producer.shutdown();
- }
- }
- package com.zqh.com.rocketmq.baseMsg;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
- import org.springframework.stereotype.Component;
-
- @Component
- public class BaseMsgConsumer {
- private DefaultMQPushConsumer consumer;
- private static final String CONSUMER_GROUP = "base_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "base_topic";
- private final static String TAGS = "base_tags_async || base_tags_oneway || base_tags";
-
- public BaseMsgConsumer() throws MQClientException {
- System.out.println("初始化成功");
- consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- consumer.subscribe(TOPIC, TAGS);
- // 分散消费 MessageModel.CLUSTERING 同一个 Consumer ID 所标识的所有 Consumer 分散消费消息。
- // 广播消费 MessageModel.BROADCASTING 同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。
- consumer.setMessageModel(MessageModel.BROADCASTING);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- // 并发消费模式 (MessageListenerConcurrently)
- consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
- System.out.println(Thread.currentThread().getName()
- + " Receive New Messages: " + list.size());
- list.forEach(msg -> {
- // 业务实现
- System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
- });
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- }
- }
- package com.zqh.com.rocketmq.batchMsg;
-
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import java.util.ArrayList;
- import java.util.List;
-
- /**
- * 批量信息
- * 前面发送的消息都是一条一条发送,批量消息则是一次发送多条消息,这一批消息总大小(无论是单条消息还是消息总大小)不应该超过4MB(默认大小)
- * 批量消息一般有两种使用情况:
- * 达到一定消息数量之后发送
- * 一定时间发送(比如1分钟发送一次)
- */
- public class BatchMsgProducer {
-
- private final static Logger logger = LoggerFactory.getLogger(BatchMsgProducer.class);
- private static final String PRODUCER_GROUP = "batch_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "batch_topic";
- private final static String TAGS = "batch_tags";
-
- public static void sendBatchMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- for (int i = 0; i < 10; i++) {
- // 创建消息集合
- List<Message> messageList = new ArrayList<>();
- // CODE: 13 DESC: the message body size over max value, MAX: 4194304
- byte[] bytes = new byte[1024 * 1024 * 3];
- messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes()));
- messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes()));
- messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes()));
- SendResult sendResult = producer.send(messageList);
- System.out.println("发送结果:" + sendResult);
- }
- producer.shutdown();
- }
-
- public static void main(String[] args) throws Exception {
- sendBatchMsg();
- }
- }
- package com.zqh.com.rocketmq.batchMsg;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- @Component
- public class BatchMsgConsumer {
- private final static Logger logger = LoggerFactory.getLogger(BatchMsgConsumer.class);
- private DefaultMQPushConsumer consumer;
- private static final String CONSUMER_GROUP = "batch_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "batch_topic";
- private final static String TAGS = "batch_tags";
-
- public BatchMsgConsumer() throws MQClientException {
- System.out.println("初始化成功");
- consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- consumer.subscribe(TOPIC, TAGS);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- // 并发消费模式 (MessageListenerConcurrently)
- consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
- list.forEach(msg -> {
- // 业务实现
- System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
- });
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- }
- }
- package com.zqh.com.rocketmq.delayMsg;
-
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * 延时信息
- * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
- */
- public class DelayMsgProducer {
-
- private final static Logger logger = LoggerFactory.getLogger(DelayMsgProducer.class);
- private static final String PRODUCER_GROUP = "delay_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "delay_topic";
- private final static String TAGS = "delay_tags";
-
-
- public static void sendDelayMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- for (int i = 0; i < 10; i++) {
- Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, "rocketMq".getBytes());
- // 3对应的10s
- message.setDelayTimeLevel(3);
- // 发送消息
- SendResult sendResult = producer.send(message);
- System.out.println("发送结果:" + sendResult);
- }
- producer.shutdown();
- }
-
- public static void main(String[] args) throws Exception {
- sendDelayMsg();
- }
- }
5.2 消费端
- package com.zqh.com.rocketmq.delayMsg;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- @Component
- public class DelayMsgConsumer {
- private final static Logger logger = LoggerFactory.getLogger(DelayMsgConsumer.class);
- private DefaultMQPushConsumer consumer;
- private static final String CONSUMER_GROUP = "delay_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "delay_topic";
- private final static String TAGS = "delay_tags";
-
- public DelayMsgConsumer() throws MQClientException {
- System.out.println("初始化成功");
- consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- consumer.subscribe(TOPIC, TAGS);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- // 并发消费模式 (MessageListenerConcurrently)
- consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
- list.forEach(msg -> {
- // 业务实现
- System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
- });
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- }
- }
6.1 过滤消息
- package com.zqh.com.rocketmq.filterMsg;
-
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * 过滤信息
- */
- public class FilterMsgProducer {
-
- private final static Logger logger = LoggerFactory.getLogger(FilterMsgProducer.class);
- private static final String PRODUCER_GROUP = "filter_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "filter_topic";
- private final static String TAGS = "filter_tags";
-
- public static void sendFilterMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
- for (int i = 0; i < 10; i++) {
- Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, "bytes".getBytes());
- // 绑定自定义属性
- message.putUserProperty("i", i + "");
-
- SendResult sendResult = producer.send(message);
- System.out.println("发送结果:" + sendResult);
- }
- producer.shutdown();
- }
-
- public static void main(String[] args) throws Exception {
- sendFilterMsg();
- }
- }
6.2 消费端
- package com.zqh.com.rocketmq.filterMsg;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.MessageSelector;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- @Component
- public class FileterMsgConsumer {
- private final static Logger logger = LoggerFactory.getLogger(FileterMsgConsumer.class);
- private DefaultMQPushConsumer consumer;
- private static final String CONSUMER_GROUP = "filter_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "filter_topic";
- // * 的话代表全部接收
- private final static String TAGS = "filter_tags";
-
- public FileterMsgConsumer() throws MQClientException {
- System.out.println("初始化成功");
- consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- // 通过tag来过滤需要的
- // consumer.subscribe(TOPIC, TAGS);
- //通过sql过滤消息,只要i>=3的消息
- consumer.subscribe(TOPIC, MessageSelector.bySql("i>=3"));
-
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- // 并发消费模式 (MessageListenerConcurrently)
- consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
- list.forEach(msg -> {
- // 业务实现
- System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
- });
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- }
- }
7.1 顺序消息
- package com.zqh.com.rocketmq.orderMsg;
-
- import org.apache.rocketmq.client.exception.MQBrokerException;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.client.producer.MessageQueueSelector;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageQueue;
- import org.apache.rocketmq.remoting.exception.RemotingException;
-
- import java.util.ArrayList;
- import java.util.List;
-
- /**
- * SEND_OK:消息发送成功
- * FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- * FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- * SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 Slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- * <p>
- * 保证消息的顺序性 全局顺序消息
- * 一般通过建立单队列的方式,消息全在一个队列里,保证消息的顺序性(FIFO)
- * 局部顺序消息
- * 不需要限定单个队列,将需要按序消费的消息,放入到同一个队列中即可。
- * 比如:顺序消息A1,B2,C3,D4,都放入到队列1中,就能保证消息的顺序性。
- */
- public class OrderMsgProducer {
-
- final static String NAMESRV_ADDR = "192.168.0.30:9876";
- final static String PRODUCER_GROUP = "order_group";
- final static String TOPIC = "order_topic";
- final static String TAGS = "order_tags";
-
- public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- sendOrderMsg();
- }
-
- /**
- * 发送同步消息需要接到消息结果之后再发送下一个消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- */
- public static void sendOrderMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
- // 创建生产者producer,并指定生产者组名
- DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
- // 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.start();
-
- List<OrderMsg> orderMsgList = OrderMsg.orderMsgList();
-
- int i = 0;
- for (OrderMsg orderMsg : orderMsgList) {
- Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("hello rocketMq" + i++).getBytes());
-
- // 发送消息
- // 参数一:消息对象
- // 参数二:消息队列的选择器
- // 参数三:选择队列的业务标识(此处为订单ID)
- // 其实就是:使用业务标识,进行一定的规则,选出该标识对应存储的队列
- SendResult sendResult = producer.send(message, new MessageQueueSelector() {
- /**
- * @param list 消息队列
- * @param message 消息对象(即上面传递过来的message)
- * @param o 业务标识的参数(即传递过来的orderId)
- * @return: org.apache.rocketmq.common.message.MessageQueue
- */
- @Override
- public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
- int index = Math.abs(o.hashCode()) % (list.size());
- return list.get(index);
- }
- }, orderMsg.getId());
- // 上面的orderMsg.getId()这个orderid就是用来确认使用的队列,一般是业务订单id,如果是0的话,就是全局顺序
- System.out.println("发送结果:" + sendResult);
- }
- producer.shutdown();
- }
- }
-
- class OrderMsg {
- private String id;
- private String msg;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getMsg() {
- return msg;
- }
-
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- public OrderMsg(String id, String msg) {
- this.id = id;
- this.msg = msg;
- }
-
- public static List<OrderMsg> orderMsgList() {
- List<OrderMsg> objects = new ArrayList<>();
- for (int i = 0; i < 20; i++) {
- objects.add(new OrderMsg("12", i + ""));
- }
- return objects;
- }
- }
7.2 消费端
- package com.zqh.com.rocketmq.orderMsg;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import java.time.LocalDateTime;
-
- @Component
- public class OrderMsgConsumer {
- private final static Logger logger = LoggerFactory.getLogger(OrderMsgConsumer.class);
- private DefaultMQPushConsumer consumer;
- private static final String CONSUMER_GROUP = "order_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "order_topic";
- private final static String TAGS = "order_tags";
-
- public OrderMsgConsumer() throws MQClientException {
- System.out.println("初始化成功");
- consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- consumer.subscribe(TOPIC, TAGS);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- // 注册消息监听器
- // 顺序消息监听 - MessageListenerOrderly 使用该监听器时,只启用一个线程对一个队列进行消费
- // 即:一个队列只会被一个线程取到,第二个线程无法访问这个队列(对消费队列上锁,在消费消息之前,先去获取对应队列对应的锁,保证同一个队列不会被并发消费)
- consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
- list.forEach(msg -> {
- System.out.println(Thread.currentThread().getName() + "\t" + LocalDateTime.now() + "\t\t消费消息:" + new String(msg.getBody()));
- }
- );
- return ConsumeOrderlyStatus.SUCCESS;
- });
- consumer.start();
- }
- }
8.1 事务消息
- package com.zqh.com.rocketmq.transactionMsg;
-
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.client.producer.LocalTransactionState;
- import org.apache.rocketmq.client.producer.SendResult;
- import org.apache.rocketmq.client.producer.TransactionListener;
- import org.apache.rocketmq.client.producer.TransactionMQProducer;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * 事务信息
- */
- public class TransactionMsgProducer {
-
- private final static Logger logger = LoggerFactory.getLogger(TransactionMsgProducer.class);
- private static final String PRODUCER_GROUP = "transaction_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "transaction_topic";
- private final static String TAGS = "transaction_tags";
-
- public static void sendTransactionMsg() throws MQClientException {
- TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
- producer.setNamesrvAddr(NAMESRV_ADDR);
- producer.setTransactionListener(new TransactionListener() {
- /**
- * @Description 在该方法中执行本地事务
- * @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id
- * @param o 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
- * @return: org.apache.rocketmq.client.producer.LocalTransactionState
- * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
- */
- @Override
- public LocalTransactionState executeLocalTransaction(Message message, Object o) {
- //这边可以创建一个全局共享变量concurrentHashMap,用来存储transactionId以及对应的值(比如正常执行1,异常执行-1等),便于回查时进行判断,这里就不赘述了
- System.out.print("执行本地事务" + message.getTransactionId());
- int i = (int) o;
- if (i % 3 == 0) {
- //提交消息
- System.out.println("提交");
- return LocalTransactionState.COMMIT_MESSAGE;
- } else if (i % 3 == 1) {
- //回滚消息
- System.out.println("回滚");
- return LocalTransactionState.ROLLBACK_MESSAGE;
- }
- //消息回查
- System.out.println("回调");
- return LocalTransactionState.UNKNOW;
- }
-
- /**
- * @Description 事务消息状态回查
- * @param messageExt 通过获取transactionId来判断这条消息的本地事务执行状态
- * @return: org.apache.rocketmq.client.producer.LocalTransactionState
- * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
- */
- @Override
- public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
- System.out.println("消息回查:" + messageExt.getTransactionId() + "\t" + new String(messageExt.getBody()));
- //可以根据消息体中的信息去数据库中查询该消息是否已经被执行
- //或者根据上方执行本地事务时concurrentHashMap中存储的transactionId对应的值进行判断,返回对应的操作值
-
- //这里演示就直接提交了
- return LocalTransactionState.COMMIT_MESSAGE;
- }
- });
- // 启动生产者
- producer.start();
-
- for (int i = 0; i < 10; i++) {
- Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("rocketMq" + i).getBytes());
- // 发送事务消息
- // 参数一:消息对象
- // 参数二:附加参数,可用于事务监听器中执行本地事务时的获取参数
- SendResult sendResult = producer.sendMessageInTransaction(message, i);
- System.out.println("发送消息:" + sendResult);
- }
- // 由于MQ要回查生产者,所以不需要关闭生产者
- // producer.shutdown();
- }
-
- public static void main(String[] args) throws Exception {
- sendTransactionMsg();
- }
- }
8.2 消费端
- package com.zqh.com.rocketmq.transactionMsg;
-
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.client.exception.MQClientException;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- @Component
- public class TransactionMsgConsumer {
- private final static Logger logger = LoggerFactory.getLogger(TransactionMsgConsumer.class);
- private DefaultMQPushConsumer consumer;
- private static final String CONSUMER_GROUP = "transaction_group";
- private final static String NAMESRV_ADDR = "192.168.0.30:9876";
- private final static String TOPIC = "transaction_topic";
- private final static String TAGS = "transaction_tags";
-
- public TransactionMsgConsumer() throws MQClientException {
- System.out.println("初始化成功");
- consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
- consumer.setNamesrvAddr(NAMESRV_ADDR);
- consumer.subscribe(TOPIC, TAGS);
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
-
- // 并发消费模式 (MessageListenerConcurrently)
- consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
- System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
- list.forEach(msg -> {
- // 业务实现
- System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
- });
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- });
- consumer.start();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。