赞
踩
目录
三、管理应用模块(management)集成rabbitMQ
docker pull rabbitmq:3-management
docker run -dit --restart=always --name rabbitmq -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=master -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
-p:指定服务运行的端口(15672:UI页面通信口,浏览器界面、5672:client端通信口,最常用到的);
–hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
-e:指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
查看
docker ps
登录ip:15672/
pom.xml(rabbitMQ)
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>com.lyj.initMode</groupId>
- <artifactId>initMode-function</artifactId>
- <version>1.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>initMode-function-rabbitMQ</artifactId>
-
- <description>rabbitMQ功能模块</description>
-
- <dependencies>
- <!--rabbitmq-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- </dependencies>
-
- </project>
pom.xml(initMode)
-
- <!--rabbitmq 消息队列-->
- <rabbitmq.version>2.3.7.RELEASE</rabbitmq.version>
-
- <dependency>
- <groupId>com.lyj.initMode</groupId>
- <artifactId>initMode-function-rabbitMQ</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- <version>${rabbitmq.version}</version>
- </dependency>
pom.xml(function)
<module>rabbitMQ</module>
RabbitMqConfig.java
- package com.lyj.function.rabbitmq.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.annotation.EnableRabbit;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.retry.MessageRecoverer;
- import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.amqp.support.converter.MessageConverter;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * rabbitMq配置
- *
- */
- @Configuration
- @EnableRabbit //开启基于注解的RabbitMQ模式
- public class RabbitMqConfig {
-
- private final CachingConnectionFactory connectionFactory;
-
- public RabbitMqConfig(CachingConnectionFactory connectionFactory) {
- this.connectionFactory = connectionFactory;
- }
-
- /**
- * 消息监听器工厂
- *
- * @return
- */
- @Bean(name = "mqListenerContainer")
- public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- // 限流 一次性从队列中最大能拉取消息数
- factory.setPrefetchCount(50);
- return factory;
- }
-
- //其他方式声明监听器
- /*
- public SimpleMessageListenerContainer getObject() throws Exception {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setAmqpAdmin(amqpAdmin);
- container.setConnectionFactory(connectionFactory);
- container.setQueues(queue);
- container.setPrefetchCount(20);
- container.setConcurrentConsumers(20);
- container.setMaxConcurrentConsumers(100);
- container.setDefaultRequeueRejected(Boolean.FALSE);
- container.setAdviceChain(createRetry());
- container.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
- // container.stop();
- if (Objects.nonNull(consumer)) {
- container.setMessageListener(consumer);
- }
- return container;
- }*/
-
- /**
- * 动态生成队列
- * @param connectionFactory
- * @return
- */
- @Bean
- public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
- return new RabbitAdmin(connectionFactory);
- }
-
- /**
- * 声明队列
- *
- * @return
- */
- @Bean
- public Queue createQueue() {
-
- /*
- * 第一种方式:
- * durable():代表需要持久化
- * exclusive(): 代表该队列独占(只允许有一个consumer监听)
- * autoDelete(): 代表需要自动删除(没有consumer自动删除)
- * withArgument(): 队列的其他参数
- */
- // return QueueBuilder.durable("boot_work_queue").exclusive().autoDelete().withArgument("key", "val").build();
- /*
- * 第二种方式:通过new Queue对象来创建队列
- * name:队列名称
- * durable:队列是否持久化,默认是true
- * exclusive:是否独占,队列是否设置为排他队列,默认是false。为true时设置为排他队列,只对首次声明它的连接可见,
- * 其他连接无法声明相同名称的其他队列,并且在连接断开时自动删除,即使持久化也会被删除
- * autoDelete:队列是否自动删除,默认false。为true时,当没有消费者使用此队列,该队列会自动删除
- * 一般设置一下队列的持久化就好,其余两个就是默认false
- */
- return new Queue("chat.room.queue", true, false, false);
- }
-
-
- /**
- * 声明死信队列
- *它用于处理无法成功被消费的消息。当消息无法被消费者正常处理时,通常会被发送到死信队列,以后进行进一步的处理或分析
- * @return
- */
- @Bean
- public Queue deadQueue() {
- Map<String, Object> map = new HashMap<>();
- // 队列中的每一个消息未被消费则5秒后过期,被自动删除并移到死信队列
- map.put("x-message-ttl", 5000);
- return new Queue("chat.dead.queue", true, false, false, map);
- }
-
-
- /**
- * 声明发布订阅模式交换机
- *将消息广播到与交换机绑定的所有队列
- * @return
- */
- @Bean
- FanoutExchange fanoutExchange() {
- /*
- * 第一种方式: 通过ExchangeBuilder构建交换机
- * 通过ExchangeBuilder声明交换机
- * 每种类型交换机有对应方法,如:fanoutExchange()、topicExchange()
- * - durable: 是否持久化
- * - autoDelete: 是否自动删除
- * - withArgument: 交换机其他参数
- * */
- // return ExchangeBuilder.fanoutExchange("boot_fanout_exchange").durable(true).build();
- // return ExchangeBuilder.directExchange("boot_direct_exchange").durable(true).autoDelete().withArgument("key","val").build();
- /*
- * 第二种方式:通过new FanoutExchange对象声明交换机
- * name:交换机名称
- * durable:是否持久化(默认false)
- * autoDelete:是否自动删除(默认false)
- * */
- return new FanoutExchange("fanout.exchange", true, false);
- }
-
- /**
- * 声明路由模式交换机
- *根据消息的路由键将消息发送到特定队列
- * @return
- */
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange("direct.exchange", true, false);
- }
-
- /**
- * 声明主题模式交换机
- * 根据消息的路由键和通配符匹配将消息发送到多个队列
- */
- @Bean
- TopicExchange topicExchange() {
- return new TopicExchange("topic.exchange", true, false);
- }
-
- /**
- * 声明头交换机
- * 根据消息的自定义头部属性进行匹配路由
- */
- @Bean
- HeadersExchange headerExchange() {
- return new HeadersExchange("header.exchange", true, false);
- }
-
-
- /**
- * 交换机与队列进行绑定
- * 绑定成功后会持久化下来 需要手动解绑
- */
- @Bean
- public Binding bindQueueExchange() {
-
- /*
- * 第一种方式: 通过BindingBuilder绑定
- *
- * bind(Queue): 需要绑定的queue
- * to(Exchange): 需要绑定到哪个交换机
- * with(String): routing key
- * noargs(): 进行构建
- */
- // return BindingBuilder.bind(testQueue()).to(directExchange()).with("article").noargs();
- // return BindingBuilder.bind(testQueue()).to(directExchange()).with(testQueue().getName());
-
- /*
- * 第二种方式:通过new Binding对象绑定
- *
- * destination: 绑定的队列
- * destinationType: 绑定的类型 Binding.DestinationType.QUEUE: 绑定的类型为queue(交换机不仅可以绑定queue还可以绑定exchange)
- * exchange: 哪个交换机需要绑定
- * routingKey: routing key 路由键 Routing Key的作用是根据一定的规则将消息发送到匹配的队列中
- * arguments: 其他参数
- */
- return new Binding("chat.room.queue", Binding.DestinationType.QUEUE, "fanout.exchange", "chat.room.key", null);
- }
-
- /**
- * json格式消息转换
- *
- * @return
- */
- @Bean
- public MessageConverter jsonMessageConverter() {
- return new Jackson2JsonMessageConverter();
- }
-
- /**
- * 声明消费者失败失败处理队列
- * @return
- */
- @Bean
- public Queue errorQueue() {
- return new Queue("error.routing.key", true, false, false);
- }
-
- /**
- * 将消费者失败失败处理策略改为RepublishMessageRecoverer,首先定义接收失败消息的交换机、队列及其绑定关系;然后定义RepublishMessageRecoverer
- * @param rabbitTemplate
- * @return
- */
- @Bean
- public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
- return new RepublishMessageRecoverer(rabbitTemplate, "direct.exchange", "error.routing.key");
- }
-
- }
MQReturnCallbackCommonConfig.java
- package com.lyj.function.rabbitmq.config;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.BeansException;
- import org.springframework.context.ApplicationContext;
- import org.springframework.context.ApplicationContextAware;
- import org.springframework.context.annotation.Configuration;
-
-
- /**
- * rabbitmq Return机制
- * 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置
- * @author faminefree
- */
- @Slf4j
- @Configuration
- public class MQReturnCallbackCommonConfig implements ApplicationContextAware {
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- // 获取RabbitTemplate
- RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
- // 设置ReturnCallback
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
- log.info("消息发送失败,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());
- });
- }
- }
pom.xml(management)
-
- <dependency>
- <groupId>com.lyj.initMode</groupId>
- <artifactId>initMode-function-rabbitMQ</artifactId>
- </dependency>
application.yml
- spring:
- rabbitmq: #rabbitMQ
- host: 192.168.163.158 #主机地址
- username: admin #用户名guest
- password: admin #密码guest
- port: 5672 #默认端口5672
- virtual-host: master #虚拟主机
- #生产者重连
- template:
- retry:
- enabled: true #开启超时重试机制 重试机制是阻塞式的重试 如果对于业务性能有要求,建议禁用重试机制。如果需要使用,需要合理配置等待时长和重试次数,也可以考虑使用异步线程来执行发送消息。
- initial-interval: 1000 #失败后的初始等待时间
- multiplier: 1 #失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
- max-attempts: 3 #最大重试次数
- #生产者确认机制
- #rabbitmq客户端发送消息首先发送的交换器exchange,然后通过路由键routingKey和bindingKey比较判定需要将消息发送到那个队列queue上;
- #在这个过程有两个地方消息可能丢失,第一消息发送到交换器exchange的过程,第二消息从交换器exchange发送到队列queue的过程;
- #publiser-confirm模式可以确保生产者到交换器exchange消息有没有发送成功
- #publisher-return模式可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃
- #publisher-confirm-type有三种模式:
- #none:关闭confirm机制
- #simple:同步阻塞等待MQ的回执消息
- #correlated:MQ异步回调方式返回回执消息
- publisher-confirm-type: correlated #开启publisher confirm消息确认模式,并设置confirm类型
- publisher-returns: true #开启publisher return机制,一般不会开启 RabbitTemplate只能配置一个ReturnCallback
- #消费者确认机制
- listener:
- simple:
- prefetch: 1
- acknowledge-mode: none # none:关闭ack;manual:手动ack;auto:自动ack
- #消费者失败重试机制
- #失败消息处理策略
- #在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- #RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式。
- #ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
- #RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机。
- retry:
- enabled: true # 开启消费者失败重试
- initial-interval: 1000ms # 初始的失败等待时长为1秒
- multiplier: 1 # 下次失败的等待时长倍数,下次等待时长=multiplier*lost-interval
- max-attempts: 3 # 最大重试次数
- stateless: true # true:无状态;false:有状态。如果业务中包含事务,这里改为false
测试类
- package com.lyj.service.management.api;
-
-
- import cn.hutool.core.lang.UUID;
- import com.lyj.common.base.common.R;
- import io.swagger.annotations.Api;
- import io.swagger.annotations.ApiOperation;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.util.concurrent.ListenableFutureCallback;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * RabbitMq动态配置
- * 动态生成队列和交换机
- *
- */
- @AllArgsConstructor
- @RestController
- @RequestMapping("/api/test/mq")
- @Slf4j
- @Api(tags = "动态操作MQ队列交换机")
- public class TestRabbitMQApi {
- /**
- * 实现了AmqpAdmin接口
- */
- private RabbitAdmin rabbitAdmin;
-
- /**
- * 通过AmqpAdmin:创建和删除 Queue,Exchange,Binding
- */
- private AmqpAdmin amqpAdmin;
- private final RabbitTemplate rabbitTemplate;
-
-
- @PostMapping("/dynamic")
- @ApiOperation("绑定交换机和队列")
- public void dynamicConfig() {
- //创建mq队列
- Queue test3Queue = new Queue("test3_Queue", true, false, false);
- rabbitAdmin.declareQueue(test3Queue);
- //创建交换机
- DirectExchange direct1Exchange = new DirectExchange("direct1_Exchange", true, false);
- rabbitAdmin.declareExchange(direct1Exchange);
- //绑定交换机和队列,并设置Routing key
- Binding binding = BindingBuilder.bind(test3Queue).to(direct1Exchange).with(test3Queue.getName());
- rabbitAdmin.declareBinding(binding);
- }
-
- /**
- * 解绑队列与交换机
- */
- @GetMapping("/removeBinding")
- @ApiOperation("解绑队列与交换机")
- public void removeBinding() {
- Binding binding = new Binding("chat.room.queue", Binding.DestinationType.QUEUE, "fanout.exchange", "chat.room.key", null);
- rabbitAdmin.removeBinding(binding);
- }
-
- /**
- * 删除mq队列
- *
- * @return
- */
- @PostMapping("/deleteMq")
- @ApiOperation("删除mq队列")
- public String deleteMq(String mq) {
- rabbitAdmin.deleteQueue(mq);
- return "ok";
- }
-
- @PostMapping("/")
- @ApiOperation("绑定交换机和队列2")
- public void amqpAdminCreate(){
-
- // 创建Exchange
- amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
-
- /*
- * 创建消息队列
- * - Queue是类可以直接new,构造器第一个参数:队列名,第二参数:是否持久化存在,若没有指定参数则随机给队列名
- * */
- amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
-
- /*
- * 创建绑定规则
- * - 参数1:目的地(队列)
- * - 参数2:绑定的类型->队列
- * - 参数3:Exchange
- * - 参数4:路由件
- * - 参数5:参数没有为null
- * */
- amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));
-
- //amqpAdmin.deleteExchange(); // 删除交换器
- //amqpAdmin.deleteQueue(); // 删除队列
- }
-
- @GetMapping("/test")
- @ApiOperation("测试MQ")
- public R<Boolean> testPublisherConfirm() throws InterruptedException {
- // 创建CorrelationData
- CorrelationData cd = new CorrelationData(UUID.fastUUID().toString());
- // Future添加ConfirmCallback
- cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
- @Override
- public void onFailure(Throwable e) {
- // Future发送异常时的处理逻辑,属于spring处理异常,不是mq返回的失败,基本不会触发
- log.error("handle message ack fail", e);
- }
- @Override
- public void onSuccess(CorrelationData.Confirm result) {
- // Future接收到回执的处理逻辑,参数中的result就是回执内容
- if (result.isAck()) {
- log.debug("发送消息成功,收到ack...");
- } else {
- log.error("发送消息失败,收到nack, reason:{}", result.getReason());
- }
- }
- });
- // 发送消息
- rabbitTemplate.convertAndSend("fanout.exchange", "chat.room.key", "hello", cd);
- return R.ok(Boolean.TRUE);
- }
- }
测试监听类
- package com.lyj.service.management.api;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 使用@RabbitListener注解方法上监听消息
- *
- */
- @Component
- @Slf4j
- public class TestMqReceiver {
-
- /**
- * 监听指定的消息队列(数组类型,可以指定监听多个)
- *
- * @param s
- */
- @RabbitListener(queues = {"chat.room.queue"})
- public void receive(String s) {
- log.info("使用@RabbitListener注解方法上监听消息:{}", s);
- }
-
- }
启动测试
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。