当前位置:   article > 正文

Springboot2.X 集成 RocketMQ_mqclientexception: code: 13 desc: the message body

mqclientexception: code: 13 desc: the message body size over max value, max:

废话少说,上才艺

1.首先导入maven依赖(请自行选择对应的版本)

  1. <!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-client</artifactId>
  5. <version>4.7.1</version>
  6. </dependency>

2.RocketMQ基本使用流程

消息发送

  • 创建生产者producer,并指定生产者组名
  • 指定NameServer(rockerMq 链接地址)
  • 启动producer
  • 创建消息对象,指定主题topic,tag和消息体
  • 发送消息
  • 关闭producer

消息消费

  • 创建消费者consumer,指定消费组名
  • 指定NameServer
  • 订阅主题topic和tag
  • 注册消息监听器,设置回调函数,处理消息
  • 启动消费者consumer

3.1 基础消息发送

  1. package com.zqh.com.rocketmq.baseMsg;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendCallback;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.client.producer.SendStatus;
  8. import org.apache.rocketmq.common.message.Message;
  9. import org.apache.rocketmq.remoting.exception.RemotingException;
  10. import java.util.concurrent.CountDownLatch;
  11. import java.util.concurrent.TimeUnit;
  12. /**
  13. * SEND_OK:消息发送成功
  14. * FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  15. * FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  16. * SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 Slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  17. */
  18. public class BaseMsgProducer {
  19. final static String NAMESRV_ADDR = "192.168.0.30:9876";
  20. final static String PRODUCER_GROUP = "base_group";
  21. final static String TOPIC = "base_topic";
  22. final static String TAGS = "base_tags";
  23. public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  24. sendSyncMsg();
  25. // sendASyncMsg();
  26. // sendOnewayMsg();
  27. }
  28. /**
  29. * 发送同步消息需要接到消息结果之后再发送下一个消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  30. */
  31. public static void sendSyncMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  32. // 创建生产者producer,并指定生产者组名
  33. DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_syncMsg");
  34. //指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
  35. producer.setNamesrvAddr(NAMESRV_ADDR);
  36. producer.start();
  37. for (int i = 0; i < 1000; i++) {
  38. //创建消息对象,指定主题topic,tag和消息体
  39. Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes());
  40. //发送消息
  41. SendResult result = producer.send(message);
  42. //发送状态
  43. SendStatus sendStatus = result.getSendStatus();
  44. //消息ID
  45. String msgId = result.getMsgId();
  46. //消息队列ID
  47. int queueId = result.getMessageQueue().getQueueId();
  48. System.out.println("发送结果:" + result);
  49. }
  50. //6.- 关闭producer
  51. producer.shutdown();
  52. }
  53. /**
  54. * 异步消息不必等待返回结果,立即发送下一个消息,可以通过send(Message msg, SendCallback sendCallback)中的回调函数,对返回结果进行处理。异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
  55. */
  56. public static void sendASyncMsg() throws InterruptedException, RemotingException, MQClientException {
  57. DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_async");
  58. producer.setNamesrvAddr(NAMESRV_ADDR);
  59. producer.start();
  60. final CountDownLatch countDownLatch = new CountDownLatch(10);
  61. for (int i = 0; i < 10; i++) {
  62. Message message = new Message(TOPIC, TAGS + "_async", System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes());
  63. producer.send(message, new SendCallback() {
  64. @Override
  65. public void onSuccess(SendResult sendResult) {
  66. countDownLatch.countDown();
  67. System.out.println("发送结果:" + sendResult);
  68. }
  69. @Override
  70. public void onException(Throwable throwable) {
  71. countDownLatch.countDown();
  72. System.out.println("异常结果:" + throwable.getMessage());
  73. }
  74. });
  75. }
  76. countDownLatch.await(5, TimeUnit.SECONDS);
  77. // 避免提前关闭了,所以等发送完再关闭
  78. producer.shutdown();
  79. }
  80. /**
  81. * 单向消息通俗来说,就是发送消息不必等待返回结果,也无需执行回调函数。这种方式主要用在不特别关心发送结果的场景,例如日志发送。
  82. */
  83. public static void sendOnewayMsg() throws InterruptedException, RemotingException, MQClientException {
  84. DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP + "_oneway");
  85. producer.setNamesrvAddr(NAMESRV_ADDR);
  86. producer.start();
  87. for (int i = 0; i < 10; i++) {
  88. Message message = new Message(TOPIC, TAGS + "_oneway", System.currentTimeMillis() + "" + i, ("hello rocketMq" + i).getBytes());
  89. producer.sendOneway(message);
  90. System.out.println("发送完成");
  91. }
  92. producer.shutdown();
  93. }
  94. }

3.2 消费者

  1. package com.zqh.com.rocketmq.baseMsg;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. public class BaseMsgConsumer {
  11. private DefaultMQPushConsumer consumer;
  12. private static final String CONSUMER_GROUP = "base_group";
  13. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  14. private final static String TOPIC = "base_topic";
  15. private final static String TAGS = "base_tags_async || base_tags_oneway || base_tags";
  16. public BaseMsgConsumer() throws MQClientException {
  17. System.out.println("初始化成功");
  18. consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  19. consumer.setNamesrvAddr(NAMESRV_ADDR);
  20. consumer.subscribe(TOPIC, TAGS);
  21. // 分散消费 MessageModel.CLUSTERING 同一个 Consumer ID 所标识的所有 Consumer 分散消费消息。
  22. // 广播消费 MessageModel.BROADCASTING 同一个 Consumer ID 所标识的所有 Consumer 都会各自消费某条消息一次。
  23. consumer.setMessageModel(MessageModel.BROADCASTING);
  24. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  25. // 并发消费模式 (MessageListenerConcurrently)
  26. consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
  27. System.out.println(Thread.currentThread().getName()
  28. + " Receive New Messages: " + list.size());
  29. list.forEach(msg -> {
  30. // 业务实现
  31. System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
  32. });
  33. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  34. });
  35. consumer.start();
  36. }
  37. }

4.1 批量信息

  1. package com.zqh.com.rocketmq.batchMsg;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.exception.RemotingException;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. /**
  13. * 批量信息
  14. * 前面发送的消息都是一条一条发送,批量消息则是一次发送多条消息,这一批消息总大小(无论是单条消息还是消息总大小)不应该超过4MB(默认大小)
  15. * 批量消息一般有两种使用情况:
  16. * 达到一定消息数量之后发送
  17. * 一定时间发送(比如1分钟发送一次)
  18. */
  19. public class BatchMsgProducer {
  20. private final static Logger logger = LoggerFactory.getLogger(BatchMsgProducer.class);
  21. private static final String PRODUCER_GROUP = "batch_group";
  22. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  23. private final static String TOPIC = "batch_topic";
  24. private final static String TAGS = "batch_tags";
  25. public static void sendBatchMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  26. DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
  27. producer.setNamesrvAddr(NAMESRV_ADDR);
  28. producer.start();
  29. for (int i = 0; i < 10; i++) {
  30. // 创建消息集合
  31. List<Message> messageList = new ArrayList<>();
  32. // CODE: 13 DESC: the message body size over max value, MAX: 4194304
  33. byte[] bytes = new byte[1024 * 1024 * 3];
  34. messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes()));
  35. messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes()));
  36. messageList.add(new Message(TOPIC, TAGS, "bytes".getBytes()));
  37. SendResult sendResult = producer.send(messageList);
  38. System.out.println("发送结果:" + sendResult);
  39. }
  40. producer.shutdown();
  41. }
  42. public static void main(String[] args) throws Exception {
  43. sendBatchMsg();
  44. }
  45. }

4.2 消费者

  1. package com.zqh.com.rocketmq.batchMsg;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class BatchMsgConsumer {
  12. private final static Logger logger = LoggerFactory.getLogger(BatchMsgConsumer.class);
  13. private DefaultMQPushConsumer consumer;
  14. private static final String CONSUMER_GROUP = "batch_group";
  15. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  16. private final static String TOPIC = "batch_topic";
  17. private final static String TAGS = "batch_tags";
  18. public BatchMsgConsumer() throws MQClientException {
  19. System.out.println("初始化成功");
  20. consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  21. consumer.setNamesrvAddr(NAMESRV_ADDR);
  22. consumer.subscribe(TOPIC, TAGS);
  23. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  24. // 并发消费模式 (MessageListenerConcurrently)
  25. consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
  26. System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
  27. list.forEach(msg -> {
  28. // 业务实现
  29. System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
  30. });
  31. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  32. });
  33. consumer.start();
  34. }
  35. }

5.1 延时消费

  1. package com.zqh.com.rocketmq.delayMsg;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.exception.RemotingException;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. /**
  11. * 延时信息
  12. * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
  13. */
  14. public class DelayMsgProducer {
  15. private final static Logger logger = LoggerFactory.getLogger(DelayMsgProducer.class);
  16. private static final String PRODUCER_GROUP = "delay_group";
  17. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  18. private final static String TOPIC = "delay_topic";
  19. private final static String TAGS = "delay_tags";
  20. public static void sendDelayMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  21. DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
  22. producer.setNamesrvAddr(NAMESRV_ADDR);
  23. producer.start();
  24. for (int i = 0; i < 10; i++) {
  25. Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, "rocketMq".getBytes());
  26. // 3对应的10s
  27. message.setDelayTimeLevel(3);
  28. // 发送消息
  29. SendResult sendResult = producer.send(message);
  30. System.out.println("发送结果:" + sendResult);
  31. }
  32. producer.shutdown();
  33. }
  34. public static void main(String[] args) throws Exception {
  35. sendDelayMsg();
  36. }
  37. }

5.2 消费端

  1. package com.zqh.com.rocketmq.delayMsg;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class DelayMsgConsumer {
  12. private final static Logger logger = LoggerFactory.getLogger(DelayMsgConsumer.class);
  13. private DefaultMQPushConsumer consumer;
  14. private static final String CONSUMER_GROUP = "delay_group";
  15. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  16. private final static String TOPIC = "delay_topic";
  17. private final static String TAGS = "delay_tags";
  18. public DelayMsgConsumer() throws MQClientException {
  19. System.out.println("初始化成功");
  20. consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  21. consumer.setNamesrvAddr(NAMESRV_ADDR);
  22. consumer.subscribe(TOPIC, TAGS);
  23. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  24. // 并发消费模式 (MessageListenerConcurrently)
  25. consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
  26. System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
  27. list.forEach(msg -> {
  28. // 业务实现
  29. System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
  30. });
  31. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  32. });
  33. consumer.start();
  34. }
  35. }

6.1 过滤消息

  1. package com.zqh.com.rocketmq.filterMsg;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.common.message.Message;
  7. import org.apache.rocketmq.remoting.exception.RemotingException;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. /**
  11. * 过滤信息
  12. */
  13. public class FilterMsgProducer {
  14. private final static Logger logger = LoggerFactory.getLogger(FilterMsgProducer.class);
  15. private static final String PRODUCER_GROUP = "filter_group";
  16. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  17. private final static String TOPIC = "filter_topic";
  18. private final static String TAGS = "filter_tags";
  19. public static void sendFilterMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  20. DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
  21. producer.setNamesrvAddr(NAMESRV_ADDR);
  22. producer.start();
  23. for (int i = 0; i < 10; i++) {
  24. Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, "bytes".getBytes());
  25. // 绑定自定义属性
  26. message.putUserProperty("i", i + "");
  27. SendResult sendResult = producer.send(message);
  28. System.out.println("发送结果:" + sendResult);
  29. }
  30. producer.shutdown();
  31. }
  32. public static void main(String[] args) throws Exception {
  33. sendFilterMsg();
  34. }
  35. }

6.2 消费端

  1. package com.zqh.com.rocketmq.filterMsg;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.MessageSelector;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.stereotype.Component;
  11. @Component
  12. public class FileterMsgConsumer {
  13. private final static Logger logger = LoggerFactory.getLogger(FileterMsgConsumer.class);
  14. private DefaultMQPushConsumer consumer;
  15. private static final String CONSUMER_GROUP = "filter_group";
  16. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  17. private final static String TOPIC = "filter_topic";
  18. // * 的话代表全部接收
  19. private final static String TAGS = "filter_tags";
  20. public FileterMsgConsumer() throws MQClientException {
  21. System.out.println("初始化成功");
  22. consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  23. consumer.setNamesrvAddr(NAMESRV_ADDR);
  24. // 通过tag来过滤需要的
  25. // consumer.subscribe(TOPIC, TAGS);
  26. //通过sql过滤消息,只要i>=3的消息
  27. consumer.subscribe(TOPIC, MessageSelector.bySql("i>=3"));
  28. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  29. // 并发消费模式 (MessageListenerConcurrently)
  30. consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
  31. System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
  32. list.forEach(msg -> {
  33. // 业务实现
  34. System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
  35. });
  36. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  37. });
  38. consumer.start();
  39. }
  40. }

7.1 顺序消息

  1. package com.zqh.com.rocketmq.orderMsg;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.MessageQueueSelector;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.common.message.Message;
  8. import org.apache.rocketmq.common.message.MessageQueue;
  9. import org.apache.rocketmq.remoting.exception.RemotingException;
  10. import java.util.ArrayList;
  11. import java.util.List;
  12. /**
  13. * SEND_OK:消息发送成功
  14. * FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  15. * FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 Slave 时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  16. * SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 Slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
  17. * <p>
  18. * 保证消息的顺序性 全局顺序消息
  19. * 一般通过建立单队列的方式,消息全在一个队列里,保证消息的顺序性(FIFO)
  20. * 局部顺序消息
  21. * 不需要限定单个队列,将需要按序消费的消息,放入到同一个队列中即可。
  22. * 比如:顺序消息A1,B2,C3,D4,都放入到队列1中,就能保证消息的顺序性。
  23. */
  24. public class OrderMsgProducer {
  25. final static String NAMESRV_ADDR = "192.168.0.30:9876";
  26. final static String PRODUCER_GROUP = "order_group";
  27. final static String TOPIC = "order_topic";
  28. final static String TAGS = "order_tags";
  29. public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  30. sendOrderMsg();
  31. }
  32. /**
  33. * 发送同步消息需要接到消息结果之后再发送下一个消息,这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
  34. */
  35. public static void sendOrderMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
  36. // 创建生产者producer,并指定生产者组名
  37. DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
  38. // 指定NameServer(生产者需要把提供的消息注册到nameServer,方便消费端查找和订阅)
  39. producer.setNamesrvAddr(NAMESRV_ADDR);
  40. producer.start();
  41. List<OrderMsg> orderMsgList = OrderMsg.orderMsgList();
  42. int i = 0;
  43. for (OrderMsg orderMsg : orderMsgList) {
  44. Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("hello rocketMq" + i++).getBytes());
  45. // 发送消息
  46. // 参数一:消息对象
  47. // 参数二:消息队列的选择器
  48. // 参数三:选择队列的业务标识(此处为订单ID)
  49. // 其实就是:使用业务标识,进行一定的规则,选出该标识对应存储的队列
  50. SendResult sendResult = producer.send(message, new MessageQueueSelector() {
  51. /**
  52. * @param list 消息队列
  53. * @param message 消息对象(即上面传递过来的message)
  54. * @param o 业务标识的参数(即传递过来的orderId)
  55. * @return: org.apache.rocketmq.common.message.MessageQueue
  56. */
  57. @Override
  58. public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
  59. int index = Math.abs(o.hashCode()) % (list.size());
  60. return list.get(index);
  61. }
  62. }, orderMsg.getId());
  63. // 上面的orderMsg.getId()这个orderid就是用来确认使用的队列,一般是业务订单id,如果是0的话,就是全局顺序
  64. System.out.println("发送结果:" + sendResult);
  65. }
  66. producer.shutdown();
  67. }
  68. }
  69. class OrderMsg {
  70. private String id;
  71. private String msg;
  72. public String getId() {
  73. return id;
  74. }
  75. public void setId(String id) {
  76. this.id = id;
  77. }
  78. public String getMsg() {
  79. return msg;
  80. }
  81. public void setMsg(String msg) {
  82. this.msg = msg;
  83. }
  84. public OrderMsg(String id, String msg) {
  85. this.id = id;
  86. this.msg = msg;
  87. }
  88. public static List<OrderMsg> orderMsgList() {
  89. List<OrderMsg> objects = new ArrayList<>();
  90. for (int i = 0; i < 20; i++) {
  91. objects.add(new OrderMsg("12", i + ""));
  92. }
  93. return objects;
  94. }
  95. }

7.2 消费端

  1. package com.zqh.com.rocketmq.orderMsg;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.stereotype.Component;
  10. import java.time.LocalDateTime;
  11. @Component
  12. public class OrderMsgConsumer {
  13. private final static Logger logger = LoggerFactory.getLogger(OrderMsgConsumer.class);
  14. private DefaultMQPushConsumer consumer;
  15. private static final String CONSUMER_GROUP = "order_group";
  16. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  17. private final static String TOPIC = "order_topic";
  18. private final static String TAGS = "order_tags";
  19. public OrderMsgConsumer() throws MQClientException {
  20. System.out.println("初始化成功");
  21. consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  22. consumer.setNamesrvAddr(NAMESRV_ADDR);
  23. consumer.subscribe(TOPIC, TAGS);
  24. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  25. // 注册消息监听器
  26. // 顺序消息监听 - MessageListenerOrderly 使用该监听器时,只启用一个线程对一个队列进行消费
  27. // 即:一个队列只会被一个线程取到,第二个线程无法访问这个队列(对消费队列上锁,在消费消息之前,先去获取对应队列对应的锁,保证同一个队列不会被并发消费)
  28. consumer.registerMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
  29. System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
  30. list.forEach(msg -> {
  31. System.out.println(Thread.currentThread().getName() + "\t" + LocalDateTime.now() + "\t\t消费消息:" + new String(msg.getBody()));
  32. }
  33. );
  34. return ConsumeOrderlyStatus.SUCCESS;
  35. });
  36. consumer.start();
  37. }
  38. }

8.1 事务消息

  1. package com.zqh.com.rocketmq.transactionMsg;
  2. import org.apache.rocketmq.client.exception.MQClientException;
  3. import org.apache.rocketmq.client.producer.LocalTransactionState;
  4. import org.apache.rocketmq.client.producer.SendResult;
  5. import org.apache.rocketmq.client.producer.TransactionListener;
  6. import org.apache.rocketmq.client.producer.TransactionMQProducer;
  7. import org.apache.rocketmq.common.message.Message;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. /**
  12. * 事务信息
  13. */
  14. public class TransactionMsgProducer {
  15. private final static Logger logger = LoggerFactory.getLogger(TransactionMsgProducer.class);
  16. private static final String PRODUCER_GROUP = "transaction_group";
  17. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  18. private final static String TOPIC = "transaction_topic";
  19. private final static String TAGS = "transaction_tags";
  20. public static void sendTransactionMsg() throws MQClientException {
  21. TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);
  22. producer.setNamesrvAddr(NAMESRV_ADDR);
  23. producer.setTransactionListener(new TransactionListener() {
  24. /**
  25. * @Description 在该方法中执行本地事务
  26. * @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id
  27. * @param o 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
  28. * @return: org.apache.rocketmq.client.producer.LocalTransactionState
  29. * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
  30. */
  31. @Override
  32. public LocalTransactionState executeLocalTransaction(Message message, Object o) {
  33. //这边可以创建一个全局共享变量concurrentHashMap,用来存储transactionId以及对应的值(比如正常执行1,异常执行-1等),便于回查时进行判断,这里就不赘述了
  34. System.out.print("执行本地事务" + message.getTransactionId());
  35. int i = (int) o;
  36. if (i % 3 == 0) {
  37. //提交消息
  38. System.out.println("提交");
  39. return LocalTransactionState.COMMIT_MESSAGE;
  40. } else if (i % 3 == 1) {
  41. //回滚消息
  42. System.out.println("回滚");
  43. return LocalTransactionState.ROLLBACK_MESSAGE;
  44. }
  45. //消息回查
  46. System.out.println("回调");
  47. return LocalTransactionState.UNKNOW;
  48. }
  49. /**
  50. * @Description 事务消息状态回查
  51. * @param messageExt 通过获取transactionId来判断这条消息的本地事务执行状态
  52. * @return: org.apache.rocketmq.client.producer.LocalTransactionState
  53. * 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
  54. */
  55. @Override
  56. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
  57. System.out.println("消息回查:" + messageExt.getTransactionId() + "\t" + new String(messageExt.getBody()));
  58. //可以根据消息体中的信息去数据库中查询该消息是否已经被执行
  59. //或者根据上方执行本地事务时concurrentHashMap中存储的transactionId对应的值进行判断,返回对应的操作值
  60. //这里演示就直接提交了
  61. return LocalTransactionState.COMMIT_MESSAGE;
  62. }
  63. });
  64. // 启动生产者
  65. producer.start();
  66. for (int i = 0; i < 10; i++) {
  67. Message message = new Message(TOPIC, TAGS, System.currentTimeMillis() + "" + i, ("rocketMq" + i).getBytes());
  68. // 发送事务消息
  69. // 参数一:消息对象
  70. // 参数二:附加参数,可用于事务监听器中执行本地事务时的获取参数
  71. SendResult sendResult = producer.sendMessageInTransaction(message, i);
  72. System.out.println("发送消息:" + sendResult);
  73. }
  74. // 由于MQ要回查生产者,所以不需要关闭生产者
  75. // producer.shutdown();
  76. }
  77. public static void main(String[] args) throws Exception {
  78. sendTransactionMsg();
  79. }
  80. }

8.2 消费端

  1. package com.zqh.com.rocketmq.transactionMsg;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  4. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  5. import org.apache.rocketmq.client.exception.MQClientException;
  6. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.stereotype.Component;
  10. @Component
  11. public class TransactionMsgConsumer {
  12. private final static Logger logger = LoggerFactory.getLogger(TransactionMsgConsumer.class);
  13. private DefaultMQPushConsumer consumer;
  14. private static final String CONSUMER_GROUP = "transaction_group";
  15. private final static String NAMESRV_ADDR = "192.168.0.30:9876";
  16. private final static String TOPIC = "transaction_topic";
  17. private final static String TAGS = "transaction_tags";
  18. public TransactionMsgConsumer() throws MQClientException {
  19. System.out.println("初始化成功");
  20. consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  21. consumer.setNamesrvAddr(NAMESRV_ADDR);
  22. consumer.subscribe(TOPIC, TAGS);
  23. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  24. // 并发消费模式 (MessageListenerConcurrently)
  25. consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
  26. System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + list.size());
  27. list.forEach(msg -> {
  28. // 业务实现
  29. System.out.println(Thread.currentThread().getName() + "\t" + "组1消费" + msg.getKeys() + new String(msg.getBody()));
  30. });
  31. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  32. });
  33. consumer.start();
  34. }
  35. }

题外话

1. 事务流程图

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/256912
推荐阅读
相关标签
  

闽ICP备14008679号