赞
踩
引入依赖后:容器中自动配置RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
<!-- rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# RabbitMQ配置
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
server.port=9999
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
/** *@author: lzp *@create: 2022-11-09 20:14:43 *@description: rabbitmq配置类 */ @Configuration public class RabbitConfig { //队列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("TestDirectExchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } @Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); } }
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map);
return "ok";
}
@org.springframework.amqp.rabbit.annotation.RabbitListener(queues = "TestDirectQueue")
@Component
public class RabbitListener {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString());
}
}
配置多台监听绑定到同一个直连交互的同一个队列,那么会轮询收到消息,轮询的方式对消息进行消费,而且不存在重复消费。
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
/** * @Author: lzp * @Description: * @Date: 2022/11/9 20:57 * @Version: 1.0 * @Modified By: */ @Configuration public class FanoutRabbitConfig { /** * 创建三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 因为是扇型交换机, 路由键无需配置,配置也不起作用 */ @Bean public Queue queueA() { return new Queue("fanout.A",true); } @Bean public Queue queueB() { return new Queue("fanout.B",true); } @Bean public Queue queueC() { return new Queue("fanout.C",true); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: testFanoutMessage ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString());
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString());
}
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString());
}
}
ABC都能收到
只要发送到 fanoutExchange 这个扇型交换机的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
/** *@author: lzp *@create: 2022-11-09 20:33:31 *@description: 主题交换机 */ @Configuration public class TopicRabbitConfig { //绑定键 public final static String man = "topic.man"; public final static String woman = "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man,true); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman,true); } /** *@Author: lzp *@Description: 主题交换机 *@Date: 2022/11/9 20:35 *@Params [] *@return org.springframework.amqp.core.TopicExchange **/ @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 @Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
@GetMapping("/sendTopicMessage1") public String sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> manMap = new HashMap<>(); manMap.put("messageId", messageId); manMap.put("messageData", messageData); manMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap); return "ok"; } @GetMapping("/sendTopicMessage2") public String sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman is all "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String, Object> womanMap = new HashMap<>(); womanMap.put("messageId", messageId); womanMap.put("messageData", messageData); womanMap.put("createTime", createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap); return "ok"; }
/**
*@author: lzp
*@create: 2022-11-09 20:37:41
*@description: man
*/
@Component
@RabbitListener(queues = "topic.man")
public class TopicManReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString());
}
}
/** * @Author: lzp * @Description: * @Date: 2022/11/9 20:52 * @Version: 1.0 * @Modified By: */ @Component @RabbitListener(queues = "topic.woman") public class TopicTotalReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("TopicTotalReceiver消费者收到消息 : " + testMessage.toString()); } }
此时发送消息1 两个接收者都能收到
发送消息2 只有第二个监听者能收到(第二个绑定的路由键是topic.#,只要是topic开头的就可以)
死信队列和死信交换机和普通的队列一样,作用是,当普通队列中的消息没有被正常消费的时候,消息就会进到正常业务队列绑定的死信交换机和死信队列中去。保证消息不丢失。
进入死信队列的条件有三种:
1.消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
2.消息在队列的存活时间超过设置的TTL时间。
3.消息队列的消息数量已经超过最大队列长度。
@Configuration public class DeadRabbitConfig { public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange"; public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea"; public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb"; public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange"; public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey"; public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey"; public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea"; public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb"; // 声明业务Exchange @Bean("businessExchange") public FanoutExchange businessExchange(){ return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 声明死信Exchange @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE); } // 声明业务队列A @Bean("businessQueueA") public Queue businessQueueA(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); // 消息过期时间 60s args.put("x-message-ttl", 10000); return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build(); } // 声明业务队列B @Bean("businessQueueB") public Queue businessQueueB(){ Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY); // 消息过期时间 60s args.put("x-message-ttl", 60000); return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build(); } // 声明死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 声明死信队列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUEB_NAME); } // 声明业务队列A绑定关系 @Bean public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 声明业务队列B绑定关系 @Bean public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 声明死信队列A绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); } // 声明死信队列B绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY); } }
/**
* 测试死信队列
*/
@GetMapping("/sendDeadMessage")
public String sendDeadMessage(){
//发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
User user = new User();
user.setAge(18);
user.setName("张三");
rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, null, user,
//消息的唯一id 在后面的确认回调可以用到。
new CorrelationData(UUID.randomUUID().toString()));
return "ok";
}
为了测试效果 先不接收
发送消息后,看控制台。消息会先进到正常队列中去
等消息过期后,消息会进到死信队列
发送消息
//发送消息使用RabbitTemplate @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMq") public String sendMessageTest() { for (int i = 0; i < 10; i++) { if(i%2==0){ OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity(); reasonEntity.setId(1L); reasonEntity.setCreateTime(new Date()); reasonEntity.setName("哈哈"+i); String msg = "Hello World"; //1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口 //2、发送的对象类型的消息,可以是一个json rabbitTemplate.convertAndSend( "hello-java-exchange",//交换机名字 "hello.java", //路由键,决定交换机把消息传给哪个队列 reasonEntity,//发送的消息为对象 new CorrelationData(UUID.randomUUID().toString()));//消息的唯一id 在后面的确认回调可以用到。 }else{ OrderEntity orderEntity = new OrderEntity(); orderEntity.setOrderSn(UUID.randomUUID().toString()); //1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口 //2、发送的对象类型的消息,可以是一个json rabbitTemplate.convertAndSend( "hello-java-exchange", "hello.java", orderEntity, new CorrelationData(UUID.randomUUID().toString())); } } return "ok"; }
发送的消息类型为对象时,需要将序列化后的数据转为json
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
接收消息,监听消息
/** * channel:当前传输数据的通道 * queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只能有一个收到此消息 * (1)服务启动多个,同一个消息只能有一个服务接收到 * (2)只有一个消息完全处理完,方法运行结束,才能接收到下一个消息 */ //参数类型可以是message 实体类 channel @RabbitHandler public void revieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) { //拿到主体内容 byte[] body = message.getBody(); //拿到的消息头属性信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("接受到的消息...内容" + message + "===内容:" + content); } @RabbitHandler public void revieveMessage(OrderEntity orderEntity) { System.out.println("接受到的消息...内容" + orderEntity); }
@RabbitHandler 可以接收特定对象 只能用在方法上
@RabbitListener(queues = {“hello-java-queue”}) 接收指定队列的消息 用在类上
两种确认
一种是消息到rabbitmq服务器就产生回调,告诉本地服务器他收到消息了,
另一种是消息从交换机到队列的监听,如果消息没有从交换机到队列(比如路由键不对),就会产生回调,告诉本地服务器哪个消息未到队列,
后期可以在发送消息的时候,同时在数据库持久化保存一份,并设置rabbit服务器是否正确收到消息这一字段,通过回调方法设置该字段的值,后期定期检查数据库,对未正确发送的消息进行处理
配置文件:
#服务端消息确认
#确认消息已发送到交换机(Exchange)
spring.rabbitmq.publisher-confirm-type=correlated
#确认消息已发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
配置类
@Configuration public class ConfirmConfig { /** * 消息确认机制 * @param connectionFactory * @return */ @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); //这里一定要设置 不然发送对象类型的消息会报错 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setConnectionFactory(connectionFactory); //到交换机回调 到不到交换机都会回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("123"); } }); //到队列回调 只有在消息未到队列时才会回调 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println(returnedMessage); } }); return rabbitTemplate; } }
保证每个消息都被正确消费,此时才可以broker删除这个消息
所以一定要手动确认
(1)默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除消息
配置文件
#手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
手动确认模式,只要我们没有明确告诉MQ,货物被签收了,没有ack,消息就一直是unacked状态,即使consumer宕机,消息也不会丢失,消息会重新变为ready状态,再有新的consumer连接,可以继续接收消息。
(2)channel.basicAck(deleverTag,false)签收,业务正常完成就应该签收 false:不是批量接收模式
(3)channel.basicNack(deleverTag,false,true)拒签 业务失败,拒签 false:不是批量接收模式,true:拒签后让消息重新回到queue
delivertag:
long deliveryTag = message.getMessageProperties().getDeliveryTag();//接收消息的自增长编号,1.2.3.4.5
手动确认最好不要继续让消息重新回到队列,这样会形成死循环,消息一直不能被消费,一直回到队列。有两种处理方式:
配置的方法就是在声明队列的时候,添加参数 x-dead-letter-exchange 及 x-dead-letter-routing-key,其实就是在消费失败时,将消息使用该 exchange 及 routing key 发送至指定队列。
当你在消费消息时,如果队列里的消息出现以下情况:
1.消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
2.消息在队列的存活时间超过设置的TTL时间。
3.消息队列的消息数量已经超过最大队列长度。
这时消息会进入死信队列。我们可以不监听死信队列,定期到rabbit mq服务器去查看,把进入死信队列的消息再发一遍。或者监听,进一步处理数据(保存到数据库)
catch中将消息保存到数据库,然后手动决绝这条消息,requeue指定为false将消息丢弃。然后定期检查数据库,进行消息重发。
消息发送出去,由于网络问题没有抵达服务器。
做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制,可记录到数据库,采用定期扫描重发的方式,每个消息状态是否都被服务器收到都应该记录。
做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发。
消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
publisher也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机
一定开启手动ACK,消费成功才移除,失败或者没来得及处理就noAck,对消息进行数据库保存或者使用死信队列,定期检查进行消息重发。
消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费者
消息消费失败,由于重试机制,自动又将消息发送出去
成功消费,ack时宕机,消息由unack变为ready,Broker又重新发送
消费者的业务消费接口应该设计为幂等性的。比如扣库存有 工作单的状态标志
使用防重表(redis/mysql),发送消息每一个都有业务的唯 一标识,处理过就不用处理
rabbitMQ的每一个消息都有redelivered字段,可以获取是否 是被重新投递过来的,而不是第一次投递过来的
消费者宕机积压
消费者消费能力不足积压
发送者发送流量太大
上线更多的消费者,进行正常消费
上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
docker pull rabbitmq:management
docker run --privileged=true
-d -p 5672:5672 -p 15672:15672
–name rabbitmq
-v /lzp/rabbitmq/data:/var/lib/rabbitmq
–restart=always
-e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin
rabbitmq:management
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。