当前位置:   article > 正文

spring boot 集成rocketMq + 基本使用

spring boot 集成rocketMq + 基本使用

1. RocketMq基本概念

  1. 1. NameServer
  2. 每个NameServer结点之间是相互独立,彼此没有任何信息交互
  3. 启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,
  4. 相当于一个路由控制中心。主要是用来保存topic路由信息,管理Broker
  5. 2. Broker
  6. 消息存储和中转角色,负责存储和转发消息
  7. 在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息
  8. 以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。
  9. 3. topic : 一个消息的集合的名字
  10. 创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。
  11. 4. 生产者
  12. 生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表,缓存到本地,
  13. 并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,
  14. 轮询从队列列表中选择一个队列(默认轮询)
  15. 5. 消费者
  16. 消费者跟其中一台NameServer建立连接,获取当前订阅Topic存在哪些Broker上,
  17. 然后直接跟Broker建立连接通道,然后开始消费消息

2. maven 引入starter

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.2</version>
  5. </dependency>

3.yml配置

3.1 生产者yml 配置

  1. rocketmq:
  2. name-server: 127.0.0.1:9876
  3. producer:
  4. group: my-group
  5. # 发送消息超时时间
  6. send-message-timeout: 5000
  7. # 发送消息失败重试次数
  8. retry-times-when-send-failed: 2
  9. retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

3.2 消费者yml 配置

  1. rocketmq:
  2. name-server: 127.0.0.1:9876
  3. consumer:
  4. topic: topic_test
  5. group: consumer_my-group

4.生产者发送消息

4.1 一般消息

  1. @Resource
  2. private RocketMQTemplate rocketMQTemplate;
  3. /**
  4. * 一般消息
  5. * Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。
  6. * 使用 Tag 可以实现对 Topic 中的消息进行过滤。
  7. * **/
  8. @GetMapping("/send")
  9. public String send(){
  10. rocketMQTemplate.convertAndSend("topic_test", "Hello, World!");
  11. rocketMQTemplate.convertAndSend("topic_test:tagB","Hello, World222--tagB");
  12. return "rocketMq普通消息发送完成";
  13. }

4.2 顺序消息

  1. /** 支持消费者按照发送消息的先后顺序获取消息 */
  2. @GetMapping("/send/orderly")
  3. public String sendOrder(){
  4. //发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列
  5. rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");
  6. rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");
  7. return "rocketMq顺序-消息发送成功";
  8. }

4.3 同步消息

  1. @GetMapping("/send/sync")
  2. public String sendMsg() {
  3. String message = "我是同步消息:" + LocalDateTime.now();
  4. SendResult result = rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build());
  5. log.info("同步-消息发送成功:" + LocalDateTime.now());
  6. return "rocketMq 同步-消息发送成功:" + result.getSendStatus();
  7. }

4.4 异步消息

  1. /** 发送异步消息 */
  2. @GetMapping("/send/async")
  3. public String asyncSendMsg(){
  4. String message = "我是异步消息:" + LocalDateTime.now();
  5. rocketMQTemplate.asyncSend("topic_test:tagA",message,new SendCallback() {
  6. @Override
  7. public void onSuccess(SendResult sendResult) {
  8. log.info("发送成功 (后执行),SendStatus = {}",sendResult.getSendStatus());
  9. }
  10. @Override
  11. public void onException(Throwable throwable) {
  12. log.info("发送失败 (后执行)");
  13. }
  14. });
  15. return "rocketMq 异步-消息发送成功:" + LocalDateTime.now();
  16. }

 4.5 单向消息:一般用来发送日志等不重要的消息

  1. @GetMapping("/send/oneWay")
  2. public String sendOneWayMessage() {
  3. String message = "我是单向消息:"+LocalDateTime.now();
  4. this.rocketMQTemplate.sendOneWay("topic_test:tagA", message);
  5. log.info("单向发送消息完成:message = {}", message);
  6. return "rocketMq 单向-消息发送成功:" + LocalDateTime.now();
  7. }

 

4.6 延时消息

  1. /** 延时消息 */
  2. @GetMapping("/sendDelay")
  3. public String sendDelay(){
  4. String message = "我是延时消息:" + LocalDateTime.now();
  5. // 第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2h
  6. rocketMQTemplate.syncSend("topic_test:tagC", MessageBuilder.withPayload(message).build(), 3000, 2);
  7. return "rocketMq延时-消息发送成功";
  8. }

4.7 事务消息

4.7.1 事务消息发送代码

  1. /** 事务消息 */
  2. @GetMapping("/send/transaction/{id}")
  3. public void sendTransactionMessage(@PathVariable("id") Integer id){
  4. //发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
  5. //参数一:topic;参数二:消息
  6. // 事务id
  7. String[] tags = {"tagA", "tagB", "tagC"};
  8. int i = id%3;
  9. String transactionId = UUID.randomUUID().toString();
  10. String message = "我是事务消息:" + LocalDateTime.now();
  11. TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:" + tags[i]
  12. , MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),
  13. // 给本地事务的参数
  14. 2);
  15. //发送状态
  16. String sendStatus = result.getSendStatus().name();
  17. //本地事务执行状态
  18. String localState = result.getLocalTransactionState().name();
  19. log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);
  20. }

4.7.2 继承 RocketMQLocalTransactionListener

  1. @Slf4j
  2. @RocketMQTransactionListener
  3. public class MyTransactionListener implements RocketMQLocalTransactionListener {
  4. @Override
  5. public RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {
  6. MessageHeaders headers = message.getHeaders();
  7. //获取事务ID
  8. String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
  9. log.info("执行本地事务 ,transactionId is {}, orderId is {}",transactionId, message.getHeaders().get("rocketmq_TAGS"));
  10. try{
  11. //模拟网络波动
  12. Thread.sleep(3000);
  13. /***
  14. * 首先发送一个半消息(half message),这个消息不会立即投递给消费者;然后执行本地事务(比如数据库操作)。
  15. * 根据本地事务的执行结果,决定是提交(commit)还是回滚(rollback)这个消息。
  16. * 如果本地事务成功,消息会被提交并发送给消费者;
  17. * 如果失败,消息会被回滚,消费者不会接收到这个消息
  18. */
  19. }catch (Exception e){
  20. return RocketMQLocalTransactionState.ROLLBACK;
  21. }
  22. // 执行本地事务
  23. String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));
  24. if (StringUtils.equals("tagA", tag)){
  25. //这里只讲TAGA消息提交,状态为可执行
  26. return RocketMQLocalTransactionState.COMMIT;
  27. }else if (StringUtils.equals("tagB", tag)) {
  28. return RocketMQLocalTransactionState.ROLLBACK;
  29. } else if (StringUtils.equals("tagC",tag)) {
  30. return RocketMQLocalTransactionState.UNKNOWN;
  31. }
  32. log.info("事务提交,消息正常处理: " + LocalDateTime.now());
  33. //执行成功,可以提交事务
  34. return RocketMQLocalTransactionState.COMMIT;
  35. }
  36. @Override
  37. public RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {
  38. MessageHeaders headers = message.getHeaders();
  39. //获取事务ID
  40. String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
  41. log.info(transactionId + ",消息回查"+ LocalDateTime.now());
  42. return RocketMQLocalTransactionState.ROLLBACK;
  43. }
  44. }

tagA、tagB、tagC 三种事务消息,只有Commit的才能发送到broker 

 

 5. 消费端

  1. /**
  2. * topic指定消费的主题,consumerGroup指定消费组,
  3. * 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费
  4. * 2.实现RocketMQListener接口
  5. * 如果想拿到消息的其他参数可以写成MessageExt
  6. * selectorExpression = "tagA || tagB" 指定tag 的消费
  7. */
  8. @Service
  9. @Slf4j
  10. @RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")
  11. public class RocketMqConsumer implements RocketMQListener<String>{
  12. @Override
  13. public void onMessage(String s) {
  14. log.info("topic_test: 所有的收到消息:"+s);
  15. }
  16. }

6.广播消费模式

生产端是一样的,但是消费端需要增加一个参数

messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
  1. @Service
  2. @Slf4j
  3. @RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
  4. public class RocketMqConsumer implements RocketMQListener<String>{
  5. @Override
  6. public void onMessage(String s) {
  7. log.info("consumer2---topic_test: 所有的收到消息:"+s);
  8. }
  9. }
  10. // 第2个消费者类,他们都是一样的代码,
  11. //为了表示广播,就是一个消息,会被这两个消费者消费
  12. @Service
  13. @Slf4j
  14. @RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
  15. public class RocketMqConsumer implements RocketMQListener<String>{
  16. @Override
  17. public void onMessage(String s) {
  18. log.info("consumer1--topic_test: 所有的收到消息:"+s);
  19. }
  20. }

7.其他

RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/411668
推荐阅读
相关标签
  

闽ICP备14008679号