当前位置:   article > 正文

springboot2.x版本整合RocketMQ4.9.3_springboot2.x集成rocketmq4.9

springboot2.x集成rocketmq4.9

一、在pom.xml中添加RocketMQ客户端依赖包

  1. <!-- rocketmq服务端版本4.9.3 -->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>rocketmq-spring-boot-starter</artifactId>
  5. <version>2.2.2</version>
  6. </dependency>

二、在springboot的配置文件中添加如下配置,将namesrv相应ip和端口修改为自己的ip和端口,注意rocketMQ集群,多个ip:port使用分号分隔

  1. #rokectmq连接信息配置和默认分组
  2. rocketmq:
  3. name-server: 192.168.1.208:9876;192.168.1.208:9877
  4. producer:
  5. group: guizhou-producer-ext
  6. consumer:
  7. group: guizhou-consumer-ext

三、创建生产者类,发送消息

  1. package com.dzt.manager.message.producer;
  2. import com.alibaba.fastjson.JSON;
  3. import com.dzt.constant.RocketMqConst;
  4. import org.apache.rocketmq.client.producer.SendCallback;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.messaging.support.MessageBuilder;
  10. import org.springframework.stereotype.Service;
  11. import javax.annotation.Resource;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. import java.util.UUID;
  15. /**
  16. * @author dzt
  17. */
  18. @Service
  19. public class LiveSenderService {
  20. public static final Logger log = LoggerFactory.getLogger(LiveSenderService.class);
  21. @Resource
  22. private RocketMQTemplate rocketMQTemplate;
  23. /**
  24. * 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
  25. */
  26. public void send() {
  27. Map<String, String> map = new HashMap<>();
  28. map.put("orderId", UUID.randomUUID().toString());
  29. rocketMQTemplate.convertAndSend(RocketMqConst.topic_name + ":tag1", map);
  30. // 等价于上面一行
  31. //rocketMQTemplate.send(topic_name + ":tag1",
  32. MessageBuilder.withPayload(map).build());
  33. }
  34. /**
  35. * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
  36. * (msgBody也可以是对象,sendResult为返回的发送结果)
  37. */
  38. public SendResult sendMsg(String msgBody) {
  39. SendResult sendResult = rocketMQTemplate.syncSend(RocketMqConst.topic_name,
  40. MessageBuilder.withPayload(msgBody).build());
  41. log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
  42. return sendResult;
  43. }
  44. /**
  45. * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理
  46. * 相关成功失败时的逻辑)
  47. * (适合对响应时间敏感的业务场景)
  48. */
  49. public void sendAsyncMsg(String msgBody) {
  50. rocketMQTemplate.asyncSend(RocketMqConst.topic_name,
  51. MessageBuilder.withPayload(msgBody).build(), new SendCallback() {
  52. @Override
  53. public void onSuccess(SendResult sendResult) {
  54. // 处理消息发送成功逻辑
  55. log.info("消息发送成功,响应结果:{}", sendResult);
  56. }
  57. @Override
  58. public void onException(Throwable throwable) {
  59. // 处理消息发送异常逻辑,可以根据异常确定是否要进行重试或其他业务
  60. log.info("消息发送失败,出现异常", throwable);
  61. }
  62. });
  63. }
  64. /**
  65. * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
  66. * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m
  67. 9m 10m 20m 30m 1h 2h
  68. */
  69. public void sendDelayMsg(String msgBody, int delayLevel) {
  70. rocketMQTemplate.syncSend(RocketMqConst.topic_name,
  71. MessageBuilder.withPayload(msgBody).build(), 30 * 1000, delayLevel);
  72. }
  73. /**
  74. * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
  75. */
  76. public void sendOneWayMsg(String msgBody) {
  77. rocketMQTemplate.sendOneWay(RocketMqConst.topic_name,
  78. MessageBuilder.withPayload(msgBody).build());
  79. }
  80. /**
  81. * 发送带tag的消息,直接在topic_name后面加上":tag"
  82. */
  83. public SendResult sendTagMsg(String msgBody) {
  84. return rocketMQTemplate.syncSend(RocketMqConst.topic_name + ":tag2",
  85. MessageBuilder.withPayload(msgBody).build());
  86. }
  87. public void sendOrderMsg(String msgBody) {
  88. log.info("开始发送顺序消息......");
  89. rocketMQTemplate.syncSendOrderly(RocketMqConst.topic_name, msgBody, "order");
  90. log.info("结束发送顺序消息......");
  91. }
  92. }

四、创建消费者类,消费消息

  1. package com.dzt.manager.message.consumer;
  2. import com.dzt.constant.RocketMqConst;
  3. import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
  4. import org.apache.rocketmq.spring.core.RocketMQListener;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.stereotype.Service;
  8. /**
  9. * @author dzt
  10. * 消费模式默认为集群模式,即同一个消费者组内,多个实例负载均衡后,只有一个实例消费一次
  11. * 如果需要使用广播模式,只需要添加该属性:messageModel = MessageModel.BROADCASTING
  12. 泛型可以根据消息的具体类型酌情修改
  13. */
  14. @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = RocketMqConst.topic_name)
  15. @Service
  16. public class LiveReceiveService implements RocketMQListener<String> {
  17. public static final Logger log = LoggerFactory.getLogger(LiveReceiveService.class);
  18. @Override
  19. public void onMessage(String message) {
  20. log.info("接收到消息:{}",message);
  21. }
  22. }

五、用到的常量类

  1. package com.dzt.constant;
  2. /**
  3. * @author dzt
  4. */
  5. public class RocketMqConst {
  6. /**
  7. * 同一项目统一使用一个topic_name
  8. */
  9. public static final String topic_name = "dzt-ext-sys";
  10. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/713038
推荐阅读
相关标签
  

闽ICP备14008679号