赞
踩
目录
MQ(message queue),从字面意思上看,本质是个队列,遵从先入先出的规则,只不过队列中存放的内容是 message 而已,是一种跨进程的通信机制,用于上下游传递消息。RabbitMq是开发中常用的一种消息中间件,由于实现服务之间的消息转发。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
它主要基于四大核心概念:生产者、交换机、队列、消费者。其工作原理如下:
简单来说,工作原理如下:
生产者——>生成消息——>建立连接——>交换机——>队列——>建立连接——>消费者
在生产环境中偶尔会由于一些原因导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败,会 导致消息的丢失,需要手动处理和恢复。队列和交互机可以进行持久化操作,即rabbitmq重启不会导致队列和交换机的丢失。那么同样,我们也需要对消息进行持久化操作,保证消息的可靠性。
我们可以通过发布确认的方式达到消息持久化的目的:
当消息发布出去后,没有收到交换机返回的确认信息时(即ack为false),我们即认为没有此次消息发布失败,即需要启用缓存机制缓存该消息。过一段时间后再次进行消息的发布。直至收到交换机返回的确认消息,从缓存中删除该消息。
具体实现代码如下:
1.编写配置文件:
- spring:
- rabbitmq:
- host: 127.0.0.1 #主机地址
- port: 5672 #端口号
- username: guest #用户名
- password: guest #密码
- publisher-confirm-type: correlated #发布确认模式:correlated即交换机收到消息后触发回调方法
2.编写rabbitmq配置类,在类中声明交换机及队列,其架构及代码如下:
- package com.seven.rabbitmq.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class ConfirmConfig {
- public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
- public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
-
-
- //声明直接交换机Exchange
- @Bean("confirmExchange")
- public DirectExchange confirmExchange() {
- return new DirectExchange(CONFIRM_EXCHANGE_NAME);
- }
-
- //声明确认队列
- @Bean("confirmQueue")
- public Queue confirmQueue() {
- return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
- }
-
- //绑定交换机和队列
- @Bean
- public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
- @Qualifier("confirmExchange") DirectExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with("key1");
- }
-
- }
此处声明直接交换机confirm.exchange及队列confirm.queue,并通过routing key :key1绑定二者。
3.编写监听队列confirm.queue的消费者代码:
- package com.seven.rabbitmq.listener;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- import static com.seven.rabbitmq.config.ConfirmConfig.CONFIRM_QUEUE_NAME;
-
- @Component
- @Slf4j
- public class ConfirmConsumer {
- @RabbitListener(queues = CONFIRM_QUEUE_NAME)
- public void receiveConfirmMsg(Message message){
- log.info("接收到confirm.queue队列消息:"+ new String(message.getBody()));
- }
- }
4.在controller中编写生产者代码:
- @GetMapping("/confirm/send/{message}")
- public void sendMsgAndConfirm(@PathVariable String message){
-
- rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",""+message,new CorrelationData("1"));
- log.info("当前时间:{}, 发送信息:{}", new Date(), message);
- }
代码中的 new CorrelationData("1") 代码用于给消息指定id
5.编写回调类:
- package com.seven.rabbitmq.callback;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
-
- @Component //step1
- @Slf4j
- public class MyCallBack implements RabbitTemplate.ConfirmCallback{
-
- @Resource //step2
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct //step3
- //后置注入,在创建完容器后,执行初始化方法前注入
- public void init(){
- //将自定义回调类注入RabbitTemplate中
- rabbitTemplate.setConfirmCallback(this);
- }
-
- /**
- * 交换机不管是否收到消息的一个回调方法
- * @param correlationData 消息相关数据
- * @param ack 交换机是否收到消息
- * @param cause 未收到消息的原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- String id = correlationData != null ? correlationData.getId() : "";
- if (ack) {
- log.info("交换机已经收到 id 为:{}的消息", id);
- } else {
- log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
- }
- }
-
- }
-
在callback类中继承 RabbitTemplate.ConfirmCallback类,并实现里面的confirm方法。在confirm方法可以根据ACK的值进行对应的业务操作,比如:为false时,启用redisTemplate将该消息存入redis缓存中;为true时,调用数据库信息,进行操作等。此处可根据业务需求自信修改。
实现以上代码后,当交互机名称错误,或是交换机不存在等问题时,会调用回调方法,进行里面的逻辑调用,实现消息持久化:
(如上图,为交换机名称错误)
而当routing key错误时,则不会触发上述错误。即在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
此问题我们可以通过设置回退消息解决:
1.在配置文件中添加配置:
- spring:
- rabbitmq:
- host: 127.0.0.1 #主机地址
- port: 5672 #端口号
- username: guest #用户名
- password: guest #密码
- publisher-confirm-type: correlated #发布确认模式:correlated即交换机收到消息后触发回调方法
- publisher-returns: true #回退消息,当找不到routing key对应的队列时,是否回退信息
2. 在回调类中添加代码,继承 RabbitTemplate.ReturnCallback类:
(此处注意:RabbitTemplate.ReturnCallback为旧版本使用类,
新版本请使用RabbitTemplate.ReturnsCallback)
- package com.seven.rabbitmq.callback;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
-
- @Component //step1
- @Slf4j
- public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
-
- @Resource //step2
- private RabbitTemplate rabbitTemplate;
-
- @PostConstruct //step3
- //后置注入,在创建完容器后,执行初始化方法前注入
- public void init(){
- //将自定义回调类注入RabbitTemplate中
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnCallback(this);
- }
-
- /**
- * 交换机不管是否收到消息的一个回调方法
- * @param correlationData 消息相关数据
- * @param ack 交换机是否收到消息
- * @param cause 未收到消息的原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- String id = correlationData != null ? correlationData.getId() : "";
- if (ack) {
- log.info("交换机已经收到 id 为:{}的消息", id);
- } else {
- log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
- }
- }
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",
- new String(message.getBody()), exchange,
- replyText, routingKey, replyCode);
- }
- }
同理,在returnedMessage方法中实现对应的业务逻辑。
在设置了消息确认和回退消息后,我们获得了对无法投递消息的感知能力,在生产者的消息无法被投递时发现并处理。
我们通常可以为队列设置死信队列来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。
同时,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。
其主要架构如下:
修改rabbitmq配置类代码,声明备用交互机,备用队列及告警队列,并对直接交换机绑定其备用交换机 backup.exchange
- package com.seven.rabbitmq.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class ConfirmConfig {
- public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
- public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
- //关于备份的
- public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
- public static final String BACKUP_QUEUE_NAME = "backup.queue";
- public static final String WARNING_QUEUE_NAME = "warning.queue";
-
- // //声明直接交换机Exchange
- // @Bean("confirmExchange")
- // public DirectExchange confirmExchange() {
- // return new DirectExchange(CONFIRM_EXCHANGE_NAME);
- // }
-
- //声明确认队列
- @Bean("confirmQueue")
- public Queue confirmQueue() {
- return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
- }
-
- //绑定交换机和队列
- @Bean
- public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
- @Qualifier("confirmExchange") DirectExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange).with("key1");
- }
-
-
- //声明备份 Exchange
- @Bean("backupExchange")
- public FanoutExchange backupExchange() {
- return new FanoutExchange(BACKUP_EXCHANGE_NAME);
- }
-
- //声明确认 Exchange 交换机的备份交换机
- @Bean("confirmExchange")
- public DirectExchange confirmExchange() {
- ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
- .durable(true)
- //设置该交换机的备份交换机
- .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
- return exchangeBuilder.build();
- }
-
-
- // 声明警告队列
- @Bean("warningQueue")
- public Queue warningQueue() {
- return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
- }
-
- // 声明报警队列绑定关系
- @Bean
- public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
- @Qualifier("backupExchange") FanoutExchange backupExchange) {
- return BindingBuilder.bind(queue).to(backupExchange);
- }
-
- // 声明备份队列
- @Bean("backQueue")
- public Queue backQueue() {
- return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
- }
-
- // 声明备份队列绑定关系
- @Bean
- public Binding backupBinding(@Qualifier("backQueue") Queue queue,
- @Qualifier("backupExchange") FanoutExchange backupExchange) {
- return BindingBuilder.bind(queue).to(backupExchange);
- }
- }
编写监听消费者:
- package com.seven.rabbitmq.listener;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import static com.seven.rabbitmq.config.ConfirmConfig.BACKUP_QUEUE_NAME;
- import static com.seven.rabbitmq.config.ConfirmConfig.WARNING_QUEUE_NAME;
-
- @Component
- @Slf4j
- public class BackupConsumer {
-
- @RabbitListener(queues = BACKUP_QUEUE_NAME)
- public void receiveConfirmMsg(Message message){
- log.info("备份队列接收到confirm.queue队列消息:"+ new String(message.getBody()));
- }
-
- @RabbitListener(queues = WARNING_QUEUE_NAME)
- public void receiveWarningMsg(Message message) {
- log.error("报警发现不可路由消息:{}", new String(message.getBody()));
- }
- }
若之前已写过 confirm.exchange
交换机,由于更改配置,需要删掉,不然会报错
编写消费者,测试效果:
- @GetMapping("/confirm/send/{message}")
- public void sendMsgAndConfirm(@PathVariable String message){
-
- rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",""+message,new CorrelationData("1"));
- rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",""+message,new CorrelationData("2"));
- log.info("当前时间:{}, 发送信息:{}", new Date(), message);
- }
运行效果:
可以看到routing key : key1正确发到直接交换机中,并发送到key1对应的队列中。
routing key : key2不存在,发生告警,启用备份交换机。备份队列和告警队列都收到备份交换机的信息,告警队列发出警告;备份队列进行业务处理。
两个消息都准确发到直接交换机中,收到发布确认消息。
由上述测试中可以看出,当交换机发布确认与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级更高。
即当备份交换机宕机或是其他意外无法使用后,才会启用回退消息,将消息回退并缓存;否则启用备份交换机服务,由备份队列进行处理。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。