当前位置:   article > 正文

Springboot集成rabbitmq——消息持久化_springboot rabbitmq 消息持久化

springboot rabbitmq 消息持久化

目录

1.rabbitmq简介

2.消息持久化

3.发布确认

4.备份交换机

5.优先级比较

1.rabbitmq简介

 MQ(message queue),从字面意思上看,本质是个队列,遵从先入先出的规则,只不过队列中存放的内容是 message 而已,是一种跨进程的通信机制,用于上下游传递消息。RabbitMq是开发中常用的一种消息中间件,由于实现服务之间的消息转发。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。

它主要基于四大核心概念:生产者、交换机、队列、消费者。其工作原理如下:

 简单来说,工作原理如下:

生产者——>生成消息——>建立连接——>交换机——>队列——>建立连接——>消费者

2.消息持久化

在生产环境中偶尔会由于一些原因导致 RabbitMQ 重启,在 RabbitMQ 重启期间生产者消息投递失败,会 导致消息的丢失,需要手动处理和恢复。队列和交互机可以进行持久化操作,即rabbitmq重启不会导致队列和交换机的丢失。那么同样,我们也需要对消息进行持久化操作,保证消息的可靠性。

3.发布确认

我们可以通过发布确认的方式达到消息持久化的目的:

当消息发布出去后,没有收到交换机返回的确认信息时(即ack为false),我们即认为没有此次消息发布失败,即需要启用缓存机制缓存该消息。过一段时间后再次进行消息的发布。直至收到交换机返回的确认消息,从缓存中删除该消息。

具体实现代码如下:

1.编写配置文件:

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1 #主机地址
  4. port: 5672 #端口号
  5. username: guest #用户名
  6. password: guest #密码
  7. publisher-confirm-type: correlated #发布确认模式:correlated即交换机收到消息后触发回调方法

2.编写rabbitmq配置类,在类中声明交换机及队列,其架构及代码如下:

  1. package com.seven.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class ConfirmConfig {
  8. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  9. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  10. //声明直接交换机Exchange
  11. @Bean("confirmExchange")
  12. public DirectExchange confirmExchange() {
  13. return new DirectExchange(CONFIRM_EXCHANGE_NAME);
  14. }
  15. //声明确认队列
  16. @Bean("confirmQueue")
  17. public Queue confirmQueue() {
  18. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  19. }
  20. //绑定交换机和队列
  21. @Bean
  22. public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
  23. @Qualifier("confirmExchange") DirectExchange exchange) {
  24. return BindingBuilder.bind(queue).to(exchange).with("key1");
  25. }
  26. }

 此处声明直接交换机confirm.exchange及队列confirm.queue,并通过routing key :key1绑定二者。

3.编写监听队列confirm.queue的消费者代码:

  1. package com.seven.rabbitmq.listener;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import static com.seven.rabbitmq.config.ConfirmConfig.CONFIRM_QUEUE_NAME;
  7. @Component
  8. @Slf4j
  9. public class ConfirmConsumer {
  10. @RabbitListener(queues = CONFIRM_QUEUE_NAME)
  11. public void receiveConfirmMsg(Message message){
  12. log.info("接收到confirm.queue队列消息:"+ new String(message.getBody()));
  13. }
  14. }

 4.在controller中编写生产者代码:

  1. @GetMapping("/confirm/send/{message}")
  2. public void sendMsgAndConfirm(@PathVariable String message){
  3. rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",""+message,new CorrelationData("1"));
  4. log.info("当前时间:{}, 发送信息:{}", new Date(), message);
  5. }

 代码中的 new CorrelationData("1") 代码用于给消息指定id

5.编写回调类:

  1. package com.seven.rabbitmq.callback;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. import javax.annotation.Resource;
  9. @Component //step1
  10. @Slf4j
  11. public class MyCallBack implements RabbitTemplate.ConfirmCallback{
  12. @Resource //step2
  13. private RabbitTemplate rabbitTemplate;
  14. @PostConstruct //step3
  15. //后置注入,在创建完容器后,执行初始化方法前注入
  16. public void init(){
  17. //将自定义回调类注入RabbitTemplate中
  18. rabbitTemplate.setConfirmCallback(this);
  19. }
  20. /**
  21. * 交换机不管是否收到消息的一个回调方法
  22. * @param correlationData 消息相关数据
  23. * @param ack 交换机是否收到消息
  24. * @param cause 未收到消息的原因
  25. */
  26. @Override
  27. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  28. String id = correlationData != null ? correlationData.getId() : "";
  29. if (ack) {
  30. log.info("交换机已经收到 id 为:{}的消息", id);
  31. } else {
  32. log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
  33. }
  34. }
  35. }

在callback类中继承 RabbitTemplate.ConfirmCallback类,并实现里面的confirm方法。在confirm方法可以根据ACK的值进行对应的业务操作,比如:为false时,启用redisTemplate将该消息存入redis缓存中;为true时,调用数据库信息,进行操作等。此处可根据业务需求自信修改。

实现以上代码后,当交互机名称错误,或是交换机不存在等问题时,会调用回调方法,进行里面的逻辑调用,实现消息持久化:

(如上图,为交换机名称错误) 

而当routing key错误时,则不会触发上述错误。即在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

此问题我们可以通过设置回退消息解决:

1.在配置文件中添加配置:

  1. spring:
  2. rabbitmq:
  3. host: 127.0.0.1 #主机地址
  4. port: 5672 #端口号
  5. username: guest #用户名
  6. password: guest #密码
  7. publisher-confirm-type: correlated #发布确认模式:correlated即交换机收到消息后触发回调方法
  8. publisher-returns: true #回退消息,当找不到routing key对应的队列时,是否回退信息

2. 在回调类中添加代码,继承 RabbitTemplate.ReturnCallback类:

(此处注意:RabbitTemplate.ReturnCallback为旧版本使用类,

新版本请使用RabbitTemplate.ReturnsCallback)

  1. package com.seven.rabbitmq.callback;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. import javax.annotation.Resource;
  9. @Component //step1
  10. @Slf4j
  11. public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
  12. @Resource //step2
  13. private RabbitTemplate rabbitTemplate;
  14. @PostConstruct //step3
  15. //后置注入,在创建完容器后,执行初始化方法前注入
  16. public void init(){
  17. //将自定义回调类注入RabbitTemplate中
  18. rabbitTemplate.setConfirmCallback(this);
  19. rabbitTemplate.setReturnCallback(this);
  20. }
  21. /**
  22. * 交换机不管是否收到消息的一个回调方法
  23. * @param correlationData 消息相关数据
  24. * @param ack 交换机是否收到消息
  25. * @param cause 未收到消息的原因
  26. */
  27. @Override
  28. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  29. String id = correlationData != null ? correlationData.getId() : "";
  30. if (ack) {
  31. log.info("交换机已经收到 id 为:{}的消息", id);
  32. } else {
  33. log.info("交换机还未收到 id 为:{}消息,原因:{}", id, cause);
  34. }
  35. }
  36. @Override
  37. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  38. log.error("消息:{},被交换机 {} 退回,原因:{},路由key:{},code:{}",
  39. new String(message.getBody()), exchange,
  40. replyText, routingKey, replyCode);
  41. }
  42. }

 同理,在returnedMessage方法中实现对应的业务逻辑。

在设置了消息确认和回退消息后,我们获得了对无法投递消息的感知能力,在生产者的消息无法被投递时发现并处理。

4.备份交换机

我们通常可以为队列设置死信队列来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。

在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进 入这个队列了。

同时,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警。

其主要架构如下:

修改rabbitmq配置类代码,声明备用交互机,备用队列及告警队列,并对直接交换机绑定其备用交换机 backup.exchange

  1. package com.seven.rabbitmq.config;
  2. import org.springframework.amqp.core.*;
  3. import org.springframework.beans.factory.annotation.Qualifier;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. @Configuration
  7. public class ConfirmConfig {
  8. public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
  9. public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
  10. //关于备份的
  11. public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
  12. public static final String BACKUP_QUEUE_NAME = "backup.queue";
  13. public static final String WARNING_QUEUE_NAME = "warning.queue";
  14. // //声明直接交换机Exchange
  15. // @Bean("confirmExchange")
  16. // public DirectExchange confirmExchange() {
  17. // return new DirectExchange(CONFIRM_EXCHANGE_NAME);
  18. // }
  19. //声明确认队列
  20. @Bean("confirmQueue")
  21. public Queue confirmQueue() {
  22. return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
  23. }
  24. //绑定交换机和队列
  25. @Bean
  26. public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,
  27. @Qualifier("confirmExchange") DirectExchange exchange) {
  28. return BindingBuilder.bind(queue).to(exchange).with("key1");
  29. }
  30. //声明备份 Exchange
  31. @Bean("backupExchange")
  32. public FanoutExchange backupExchange() {
  33. return new FanoutExchange(BACKUP_EXCHANGE_NAME);
  34. }
  35. //声明确认 Exchange 交换机的备份交换机
  36. @Bean("confirmExchange")
  37. public DirectExchange confirmExchange() {
  38. ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
  39. .durable(true)
  40. //设置该交换机的备份交换机
  41. .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);
  42. return exchangeBuilder.build();
  43. }
  44. // 声明警告队列
  45. @Bean("warningQueue")
  46. public Queue warningQueue() {
  47. return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
  48. }
  49. // 声明报警队列绑定关系
  50. @Bean
  51. public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
  52. @Qualifier("backupExchange") FanoutExchange backupExchange) {
  53. return BindingBuilder.bind(queue).to(backupExchange);
  54. }
  55. // 声明备份队列
  56. @Bean("backQueue")
  57. public Queue backQueue() {
  58. return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
  59. }
  60. // 声明备份队列绑定关系
  61. @Bean
  62. public Binding backupBinding(@Qualifier("backQueue") Queue queue,
  63. @Qualifier("backupExchange") FanoutExchange backupExchange) {
  64. return BindingBuilder.bind(queue).to(backupExchange);
  65. }
  66. }

 编写监听消费者:

  1. package com.seven.rabbitmq.listener;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import static com.seven.rabbitmq.config.ConfirmConfig.BACKUP_QUEUE_NAME;
  7. import static com.seven.rabbitmq.config.ConfirmConfig.WARNING_QUEUE_NAME;
  8. @Component
  9. @Slf4j
  10. public class BackupConsumer {
  11. @RabbitListener(queues = BACKUP_QUEUE_NAME)
  12. public void receiveConfirmMsg(Message message){
  13. log.info("备份队列接收到confirm.queue队列消息:"+ new String(message.getBody()));
  14. }
  15. @RabbitListener(queues = WARNING_QUEUE_NAME)
  16. public void receiveWarningMsg(Message message) {
  17. log.error("报警发现不可路由消息:{}", new String(message.getBody()));
  18. }
  19. }

若之前已写过 confirm.exchange 交换机,由于更改配置,需要删掉,不然会报错 

编写消费者,测试效果:

  1. @GetMapping("/confirm/send/{message}")
  2. public void sendMsgAndConfirm(@PathVariable String message){
  3. rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key1",""+message,new CorrelationData("1"));
  4. rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,"key2",""+message,new CorrelationData("2"));
  5. log.info("当前时间:{}, 发送信息:{}", new Date(), message);
  6. }

运行效果:

 可以看到routing key : key1正确发到直接交换机中,并发送到key1对应的队列中。

routing key : key2不存在,发生告警,启用备份交换机。备份队列和告警队列都收到备份交换机的信息,告警队列发出警告;备份队列进行业务处理。

两个消息都准确发到直接交换机中,收到发布确认消息。

5.优先级比较

由上述测试中可以看出,当交换机发布确认与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级更高

即当备份交换机宕机或是其他意外无法使用后,才会启用回退消息,将消息回退并缓存;否则启用备份交换机服务,由备份队列进行处理。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/471943
推荐阅读
相关标签
  

闽ICP备14008679号