当前位置:   article > 正文

构建spring boot web项目:六、集成RabbitMQ_项目集成rabbitmq

项目集成rabbitmq

目录

一、docker安装RabbitMQ

1.1拉去镜像

1.2 启动容器

二、spring boot 整合RabbitMQ

2.1创建rabbitMQ模块

2.2引入依赖

2.3配置rabbitMQ

三、管理应用模块(management)集成rabbitMQ

一、docker安装RabbitMQ

1.1拉去镜像

docker pull rabbitmq:3-management

1.2 启动容器

docker run -dit --restart=always  --name rabbitmq -p 5672:5672 -p 15672:15672  --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=master  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management

-p:指定服务运行的端口(15672:UI页面通信口,浏览器界面、5672:client端通信口,最常用到的);
–hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
-e:指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

查看

docker ps 

登录ip:15672/

二、spring boot 整合RabbitMQ

2.1创建rabbitMQ模块

2.2引入依赖

pom.xml(rabbitMQ)

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>com.lyj.initMode</groupId>
  7. <artifactId>initMode-function</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <artifactId>initMode-function-rabbitMQ</artifactId>
  11. <description>rabbitMQ功能模块</description>
  12. <dependencies>
  13. <!--rabbitmq-->
  14. <dependency>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-amqp</artifactId>
  17. </dependency>
  18. </dependencies>
  19. </project>

pom.xml(initMode)

  1. <!--rabbitmq 消息队列-->
  2. <rabbitmq.version>2.3.7.RELEASE</rabbitmq.version>
  3. <dependency>
  4. <groupId>com.lyj.initMode</groupId>
  5. <artifactId>initMode-function-rabbitMQ</artifactId>
  6. <version>${project.version}</version>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-amqp</artifactId>
  11. <version>${rabbitmq.version}</version>
  12. </dependency>

pom.xml(function)

<module>rabbitMQ</module>

2.3配置rabbitMQ

RabbitMqConfig.java
  1. package com.lyj.function.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.amqp.rabbit.annotation.EnableRabbit;
  4. import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
  5. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
  6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  7. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  8. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  9. import org.springframework.amqp.rabbit.retry.MessageRecoverer;
  10. import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
  11. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  12. import org.springframework.amqp.support.converter.MessageConverter;
  13. import org.springframework.context.annotation.Bean;
  14. import org.springframework.context.annotation.Configuration;
  15. import java.util.HashMap;
  16. import java.util.Map;
  17. /**
  18. * rabbitMq配置
  19. *
  20. */
  21. @Configuration
  22. @EnableRabbit //开启基于注解的RabbitMQ模式
  23. public class RabbitMqConfig {
  24. private final CachingConnectionFactory connectionFactory;
  25. public RabbitMqConfig(CachingConnectionFactory connectionFactory) {
  26. this.connectionFactory = connectionFactory;
  27. }
  28. /**
  29. * 消息监听器工厂
  30. *
  31. * @return
  32. */
  33. @Bean(name = "mqListenerContainer")
  34. public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
  35. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  36. factory.setConnectionFactory(connectionFactory);
  37. // 限流 一次性从队列中最大能拉取消息数
  38. factory.setPrefetchCount(50);
  39. return factory;
  40. }
  41. //其他方式声明监听器
  42. /*
  43. public SimpleMessageListenerContainer getObject() throws Exception {
  44. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  45. container.setAmqpAdmin(amqpAdmin);
  46. container.setConnectionFactory(connectionFactory);
  47. container.setQueues(queue);
  48. container.setPrefetchCount(20);
  49. container.setConcurrentConsumers(20);
  50. container.setMaxConcurrentConsumers(100);
  51. container.setDefaultRequeueRejected(Boolean.FALSE);
  52. container.setAdviceChain(createRetry());
  53. container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
  54. // container.stop();
  55. if (Objects.nonNull(consumer)) {
  56. container.setMessageListener(consumer);
  57. }
  58. return container;
  59. }*/
  60. /**
  61. * 动态生成队列
  62. * @param connectionFactory
  63. * @return
  64. */
  65. @Bean
  66. public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  67. return new RabbitAdmin(connectionFactory);
  68. }
  69. /**
  70. * 声明队列
  71. *
  72. * @return
  73. */
  74. @Bean
  75. public Queue createQueue() {
  76. /*
  77. * 第一种方式:
  78. * durable():代表需要持久化
  79. * exclusive(): 代表该队列独占(只允许有一个consumer监听)
  80. * autoDelete(): 代表需要自动删除(没有consumer自动删除)
  81. * withArgument(): 队列的其他参数
  82. */
  83. // return QueueBuilder.durable("boot_work_queue").exclusive().autoDelete().withArgument("key", "val").build();
  84. /*
  85. * 第二种方式:通过new Queue对象来创建队列
  86. * name:队列名称
  87. * durable:队列是否持久化,默认是true
  88. * exclusive:是否独占,队列是否设置为排他队列,默认是false。为true时设置为排他队列,只对首次声明它的连接可见,
  89. * 其他连接无法声明相同名称的其他队列,并且在连接断开时自动删除,即使持久化也会被删除
  90. * autoDelete:队列是否自动删除,默认false。为true时,当没有消费者使用此队列,该队列会自动删除
  91. * 一般设置一下队列的持久化就好,其余两个就是默认false
  92. */
  93. return new Queue("chat.room.queue", true, false, false);
  94. }
  95. /**
  96. * 声明死信队列
  97. *它用于处理无法成功被消费的消息。当消息无法被消费者正常处理时,通常会被发送到死信队列,以后进行进一步的处理或分析
  98. * @return
  99. */
  100. @Bean
  101. public Queue deadQueue() {
  102. Map<String, Object> map = new HashMap<>();
  103. // 队列中的每一个消息未被消费则5秒后过期,被自动删除并移到死信队列
  104. map.put("x-message-ttl", 5000);
  105. return new Queue("chat.dead.queue", true, false, false, map);
  106. }
  107. /**
  108. * 声明发布订阅模式交换机
  109. *将消息广播到与交换机绑定的所有队列
  110. * @return
  111. */
  112. @Bean
  113. FanoutExchange fanoutExchange() {
  114. /*
  115. * 第一种方式: 通过ExchangeBuilder构建交换机
  116. * 通过ExchangeBuilder声明交换机
  117. * 每种类型交换机有对应方法,如:fanoutExchange()、topicExchange()
  118. * - durable: 是否持久化
  119. * - autoDelete: 是否自动删除
  120. * - withArgument: 交换机其他参数
  121. * */
  122. // return ExchangeBuilder.fanoutExchange("boot_fanout_exchange").durable(true).build();
  123. // return ExchangeBuilder.directExchange("boot_direct_exchange").durable(true).autoDelete().withArgument("key","val").build();
  124. /*
  125. * 第二种方式:通过new FanoutExchange对象声明交换机
  126. * name:交换机名称
  127. * durable:是否持久化(默认false)
  128. * autoDelete:是否自动删除(默认false)
  129. * */
  130. return new FanoutExchange("fanout.exchange", true, false);
  131. }
  132. /**
  133. * 声明路由模式交换机
  134. *根据消息的路由键将消息发送到特定队列
  135. * @return
  136. */
  137. @Bean
  138. DirectExchange directExchange() {
  139. return new DirectExchange("direct.exchange", true, false);
  140. }
  141. /**
  142. * 声明主题模式交换机
  143. * 根据消息的路由键和通配符匹配将消息发送到多个队列
  144. */
  145. @Bean
  146. TopicExchange topicExchange() {
  147. return new TopicExchange("topic.exchange", true, false);
  148. }
  149. /**
  150. * 声明头交换机
  151. * 根据消息的自定义头部属性进行匹配路由
  152. */
  153. @Bean
  154. HeadersExchange headerExchange() {
  155. return new HeadersExchange("header.exchange", true, false);
  156. }
  157. /**
  158. * 交换机与队列进行绑定
  159. * 绑定成功后会持久化下来 需要手动解绑
  160. */
  161. @Bean
  162. public Binding bindQueueExchange() {
  163. /*
  164. * 第一种方式: 通过BindingBuilder绑定
  165. *
  166. * bind(Queue): 需要绑定的queue
  167. * to(Exchange): 需要绑定到哪个交换机
  168. * with(String): routing key
  169. * noargs(): 进行构建
  170. */
  171. // return BindingBuilder.bind(testQueue()).to(directExchange()).with("article").noargs();
  172. // return BindingBuilder.bind(testQueue()).to(directExchange()).with(testQueue().getName());
  173. /*
  174. * 第二种方式:通过new Binding对象绑定
  175. *
  176. * destination: 绑定的队列
  177. * destinationType: 绑定的类型 Binding.DestinationType.QUEUE: 绑定的类型为queue(交换机不仅可以绑定queue还可以绑定exchange)
  178. * exchange: 哪个交换机需要绑定
  179. * routingKey: routing key 路由键 Routing Key的作用是根据一定的规则将消息发送到匹配的队列中
  180. * arguments: 其他参数
  181. */
  182. return new Binding("chat.room.queue", Binding.DestinationType.QUEUE, "fanout.exchange", "chat.room.key", null);
  183. }
  184. /**
  185. * json格式消息转换
  186. *
  187. * @return
  188. */
  189. @Bean
  190. public MessageConverter jsonMessageConverter() {
  191. return new Jackson2JsonMessageConverter();
  192. }
  193. /**
  194. * 声明消费者失败失败处理队列
  195. * @return
  196. */
  197. @Bean
  198. public Queue errorQueue() {
  199. return new Queue("error.routing.key", true, false, false);
  200. }
  201. /**
  202. * 将消费者失败失败处理策略改为RepublishMessageRecoverer,首先定义接收失败消息的交换机、队列及其绑定关系;然后定义RepublishMessageRecoverer
  203. * @param rabbitTemplate
  204. * @return
  205. */
  206. @Bean
  207. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
  208. return new RepublishMessageRecoverer(rabbitTemplate, "direct.exchange", "error.routing.key");
  209. }
  210. }
MQReturnCallbackCommonConfig.java
  1. package com.lyj.function.rabbitmq.config;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.BeansException;
  5. import org.springframework.context.ApplicationContext;
  6. import org.springframework.context.ApplicationContextAware;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * rabbitmq Return机制
  10. * 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
  11. * @author faminefree
  12. */
  13. @Slf4j
  14. @Configuration
  15. public class MQReturnCallbackCommonConfig implements ApplicationContextAware {
  16. @Override
  17. public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
  18. // 获取RabbitTemplate
  19. RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
  20. // 设置ReturnCallback
  21. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
  22. log.info("消息发送失败,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());
  23. });
  24. }
  25. }

三、管理应用模块(management)集成rabbitMQ

pom.xml(management)

  1. <dependency>
  2. <groupId>com.lyj.initMode</groupId>
  3. <artifactId>initMode-function-rabbitMQ</artifactId>
  4. </dependency>

application.yml

  1. spring:
  2. rabbitmq: #rabbitMQ
  3. host: 192.168.163.158 #主机地址
  4. username: admin #用户名guest
  5. password: admin #密码guest
  6. port: 5672 #默认端口5672
  7. virtual-host: master #虚拟主机
  8. #生产者重连
  9. template:
  10. retry:
  11. enabled: true #开启超时重试机制 重试机制是阻塞式的重试 如果对于业务性能有要求,建议禁用重试机制。如果需要使用,需要合理配置等待时长和重试次数,也可以考虑使用异步线程来执行发送消息。
  12. initial-interval: 1000 #失败后的初始等待时间
  13. multiplier: 1 #失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
  14. max-attempts: 3 #最大重试次数
  15. #生产者确认机制
  16. #rabbitmq客户端发送消息首先发送的交换器exchange,然后通过路由键routingKey和bindingKey比较判定需要将消息发送到那个队列queue上;
  17. #在这个过程有两个地方消息可能丢失,第一消息发送到交换器exchange的过程,第二消息从交换器exchange发送到队列queue的过程;
  18. #publiser-confirm模式可以确保生产者到交换器exchange消息有没有发送成功
  19. #publisher-return模式可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃
  20. #publisher-confirm-type有三种模式:
  21. #none:关闭confirm机制
  22. #simple:同步阻塞等待MQ的回执消息
  23. #correlated:MQ异步回调方式返回回执消息
  24. publisher-confirm-type: correlated #开启publisher confirm消息确认模式,并设置confirm类型
  25. publisher-returns: true #开启publisher return机制,一般不会开启 RabbitTemplate只能配置一个ReturnCallback
  26. #消费者确认机制
  27. listener:
  28. simple:
  29. prefetch: 1
  30. acknowledge-mode: none # none:关闭ack;manual:手动ack;auto:自动ack
  31. #消费者失败重试机制
  32. #失败消息处理策略
  33. #在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
  34. #RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
  35. #ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
  36. #RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。
  37. retry:
  38. enabled: true # 开启消费者失败重试
  39. initial-interval: 1000ms # 初始的失败等待时长为1秒
  40. multiplier: 1 # 下次失败的等待时长倍数,下次等待时长=multiplier*lost-interval
  41. max-attempts: 3 # 最大重试次数
  42. stateless: true # true:无状态;false:有状态。如果业务中包含事务,这里改为false

测试类

  1. package com.lyj.service.management.api;
  2. import cn.hutool.core.lang.UUID;
  3. import com.lyj.common.base.common.R;
  4. import io.swagger.annotations.Api;
  5. import io.swagger.annotations.ApiOperation;
  6. import lombok.AllArgsConstructor;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.amqp.core.*;
  9. import org.springframework.amqp.rabbit.connection.CorrelationData;
  10. import org.springframework.amqp.rabbit.core.RabbitAdmin;
  11. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  12. import org.springframework.util.concurrent.ListenableFutureCallback;
  13. import org.springframework.web.bind.annotation.GetMapping;
  14. import org.springframework.web.bind.annotation.PostMapping;
  15. import org.springframework.web.bind.annotation.RequestMapping;
  16. import org.springframework.web.bind.annotation.RestController;
  17. /**
  18. * RabbitMq动态配置
  19. * 动态生成队列和交换机
  20. *
  21. */
  22. @AllArgsConstructor
  23. @RestController
  24. @RequestMapping("/api/test/mq")
  25. @Slf4j
  26. @Api(tags = "动态操作MQ队列交换机")
  27. public class TestRabbitMQApi {
  28. /**
  29. * 实现了AmqpAdmin接口
  30. */
  31. private RabbitAdmin rabbitAdmin;
  32. /**
  33. * 通过AmqpAdmin:创建和删除 Queue,Exchange,Binding
  34. */
  35. private AmqpAdmin amqpAdmin;
  36. private final RabbitTemplate rabbitTemplate;
  37. @PostMapping("/dynamic")
  38. @ApiOperation("绑定交换机和队列")
  39. public void dynamicConfig() {
  40. //创建mq队列
  41. Queue test3Queue = new Queue("test3_Queue", true, false, false);
  42. rabbitAdmin.declareQueue(test3Queue);
  43. //创建交换机
  44. DirectExchange direct1Exchange = new DirectExchange("direct1_Exchange", true, false);
  45. rabbitAdmin.declareExchange(direct1Exchange);
  46. //绑定交换机和队列,并设置Routing key
  47. Binding binding = BindingBuilder.bind(test3Queue).to(direct1Exchange).with(test3Queue.getName());
  48. rabbitAdmin.declareBinding(binding);
  49. }
  50. /**
  51. * 解绑队列与交换机
  52. */
  53. @GetMapping("/removeBinding")
  54. @ApiOperation("解绑队列与交换机")
  55. public void removeBinding() {
  56. Binding binding = new Binding("chat.room.queue", Binding.DestinationType.QUEUE, "fanout.exchange", "chat.room.key", null);
  57. rabbitAdmin.removeBinding(binding);
  58. }
  59. /**
  60. * 删除mq队列
  61. *
  62. * @return
  63. */
  64. @PostMapping("/deleteMq")
  65. @ApiOperation("删除mq队列")
  66. public String deleteMq(String mq) {
  67. rabbitAdmin.deleteQueue(mq);
  68. return "ok";
  69. }
  70. @PostMapping("/")
  71. @ApiOperation("绑定交换机和队列2")
  72. public void amqpAdminCreate(){
  73. // 创建Exchange
  74. amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
  75. /*
  76. * 创建消息队列
  77. * - Queue是类可以直接new,构造器第一个参数:队列名,第二参数:是否持久化存在,若没有指定参数则随机给队列名
  78. * */
  79. amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
  80. /*
  81. * 创建绑定规则
  82. * - 参数1:目的地(队列)
  83. * - 参数2:绑定的类型->队列
  84. * - 参数3:Exchange
  85. * - 参数4:路由件
  86. * - 参数5:参数没有为null
  87. * */
  88. amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));
  89. //amqpAdmin.deleteExchange(); // 删除交换器
  90. //amqpAdmin.deleteQueue(); // 删除队列
  91. }
  92. @GetMapping("/test")
  93. @ApiOperation("测试MQ")
  94. public R<Boolean> testPublisherConfirm() throws InterruptedException {
  95. // 创建CorrelationData
  96. CorrelationData cd = new CorrelationData(UUID.fastUUID().toString());
  97. // Future添加ConfirmCallback
  98. cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
  99. @Override
  100. public void onFailure(Throwable e) {
  101. // Future发送异常时的处理逻辑,属于spring处理异常,不是mq返回的失败,基本不会触发
  102. log.error("handle message ack fail", e);
  103. }
  104. @Override
  105. public void onSuccess(CorrelationData.Confirm result) {
  106. // Future接收到回执的处理逻辑,参数中的result就是回执内容
  107. if (result.isAck()) {
  108. log.debug("发送消息成功,收到ack...");
  109. } else {
  110. log.error("发送消息失败,收到nack, reason:{}", result.getReason());
  111. }
  112. }
  113. });
  114. // 发送消息
  115. rabbitTemplate.convertAndSend("fanout.exchange", "chat.room.key", "hello", cd);
  116. return R.ok(Boolean.TRUE);
  117. }
  118. }

测试监听类

  1. package com.lyj.service.management.api;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 使用@RabbitListener注解方法上监听消息
  7. *
  8. */
  9. @Component
  10. @Slf4j
  11. public class TestMqReceiver {
  12. /**
  13. * 监听指定的消息队列(数组类型,可以指定监听多个)
  14. *
  15. * @param s
  16. */
  17. @RabbitListener(queues = {"chat.room.queue"})
  18. public void receive(String s) {
  19. log.info("使用@RabbitListener注解方法上监听消息:{}", s);
  20. }
  21. }

启动测试

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/911347
推荐阅读
相关标签
  

闽ICP备14008679号