当前位置:   article > 正文

如何确保消息的可靠性?RabbitMQ 在Springboot中的应用案例_springboot rabbitmq 消息可靠性使用demo

springboot rabbitmq 消息可靠性使用demo

案例介绍

使用mq发送邮件的优点在于:

  1. 能实现异步处理,提高系统的并发性和相应速度
  2. 更加灵活,只需要一个邮件系统就能和其他系统共用
  3. 能够确保消息可靠,提供了消息持久化消息确认机制等特性

这里我们以用户注册后需要同时发送邮件和短信这个场景做为示例,流程图如下所示。

以注册系统、邮件系统、短信系统为例

不介绍rabbitMQ的基础信息了,直接进入代码环节。

案例实操

生产者(注册系统)

pom文件中引入相关依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. <!-- 版本继承springboot -->
  5. </dependency>

application.yml文件配置

  1. # rabbitmq
  2. spring:
  3. rabbitmq:
  4. port: 5672
  5. host: localhost
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. publisher-returns: true #开启生产者手动确认
  10. publisher-confirm-type: correlated #消息确认类型

做完这些就已经成功将rabbitMQ引入到Springboot中了,接下来是生产者中的配置类,这里使用的推送方式是topic

topic交换器是指按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的BindingKey进行模糊匹配,如果匹配成功,将消息分发到该Queue。 Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding
Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

  1. import org.springframework.amqp.core.TopicExchange;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. @Configuration
  5. public class LoginRabbitConfig {
  6. private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机";
  7. // 声明交换机 topic
  8. @Bean
  9. public TopicExchange topicExchange() {
  10. // 是否持久化、是否自动删除
  11. return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false);
  12. }
  13. }

模拟一下注册的接口,因为注册后需要发送邮件和短信提醒用户,如果按照平时的顺序调用不仅耗时长,并且一旦邮件或短信发送失败没有进行异常处理话会导致注册失败,因此采用消息队列能够很好的解决这一问题,代码中UserPOJO只是定义的一个实体类

  1. import com.alibaba.fastjson.JSON;
  2. import gwc.mq.pojo.UserPOJO;
  3. import org.springframework.amqp.rabbit.connection.CorrelationData;
  4. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.util.UUID;
  9. @RestController
  10. public class LoginController {
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. @RequestMapping("/register")
  14. public String register(UserPOJO userPOJO) {
  15. Object msg = JSON.toJSONString(userPOJO);
  16. // 设置ConfirmCallback
  17. rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  18. if (ack) {
  19. // 消息发送成功
  20. System.out.println("消息发送成功, correlationData: " + correlationData.getReturnedMessage());
  21. } else {
  22. // 消息发送失败
  23. System.out.println("消息发送失败, cause: " + cause);
  24. }
  25. });
  26. try {
  27. rabbitTemplate.convertAndSend("测试用Topic交换机", "*", msg, new CorrelationData(UUID.randomUUID().toString()));
  28. } catch (Exception e) {
  29. e.printStackTrace();
  30. // 失败处理 无论出现那种情况都将错误消息存到redis中 然后用定时任务统一发送
  31. }
  32. return "用户-" + userPOJO.getName() + "-注册成功!";
  33. }
  34. }

在配置类中,我们打开了消息发送方的消息确认机制,因此在这里我们需要setConfirmCallback函数,其中correlationData是具体的消息,ack表示是否发送成功,cause则是失败的具体原因。

发送方发送失败的原因有三种可归为

(1)producter连接mq失败,消息没有发送到mq
(2)producter连接mq成功,但是发送到exchange失败
(3)消息发送到exchange成功,但是路由到queue失败

无论出现哪一种异常,我们都可以通过try catch来进行错误消息的处理,我采用的是捕获到错误后将消息存入db中(redis),再通过springboot的定时任务进行统一的重发,存入db代码就不再描述。

消费者(以邮件系统为例)

application.yml文件配置

  1. #rabbitMQ
  2. spring:
  3. rabbitmq:
  4. port: 5672
  5. host: localhost
  6. username: guest
  7. password: guest
  8. virtual-host: /
  9. listener:
  10. simple:
  11. acknowledge-mode: manual #开启手动确认机制

邮件系统的配置类

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. @Configuration
  6. public class MailRabbitConfig {
  7. private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机";
  8. private static final String QUEUE_NAME = "email_queue";
  9. private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机";
  10. private static final String ERR_QUEUE_NAME = "err_email_queue";
  11. // 声明交换机 topic
  12. @Bean
  13. public TopicExchange topicExchange() {
  14. // 是否持久化、是否自动删除
  15. return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false);
  16. }
  17. // 声明队列
  18. @Bean
  19. public Queue queue() {
  20. // 是否持久化、是否当前连接对象独占、是否自动删除
  21. return new Queue(QUEUE_NAME, true, false, false);
  22. }
  23. // 声明绑定关系
  24. @Bean
  25. public Binding queueBinding(Queue queue, TopicExchange topicExchange) {
  26. return BindingBuilder.bind(queue).to(topicExchange).with("*.mail");
  27. }
  28. // 声明死信交换机 direct
  29. @Bean
  30. public DirectExchange directExchange() {
  31. // 是否持久化、是否自动删除
  32. return new DirectExchange(ERR_EXCHANGE_NAME_DIRECT, true, false);
  33. }
  34. // 声明死信队列
  35. @Bean
  36. public Queue errQueue() {
  37. // 是否持久化、是否当前连接对象独占、是否自动删除
  38. return new Queue(ERR_QUEUE_NAME, true, false, false);
  39. }
  40. // 声明绑定关系
  41. @Bean
  42. public Binding errQueueBinding(@Qualifier("errQueue")Queue errQueue, @Qualifier("directExchange")DirectExchange directExchange) {
  43. return BindingBuilder.bind(errQueue).to(directExchange).with("err.mail");
  44. }
  45. }

邮件发送服务进行监听

  1. import com.alibaba.fastjson.JSON;
  2. import com.rabbitmq.client.Channel;
  3. import gwc.mq.pojo.UserPOJO;
  4. import org.springframework.amqp.core.*;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import java.io.IOException;
  10. import java.util.HashMap;
  11. import java.util.Map;
  12. // 死信队列
  13. // 消息出现问题后则会进入死信交换机,然后进入死信队列
  14. // 建立一个消费者根据routingkey监听死信队列 即可处理不同的死信队列中的数据
  15. @Component
  16. public class MailListener {
  17. private static final int MAX_RETRY = 3;
  18. private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机";
  19. private static final String ERR_QUEUE_NAME = "err_email_queue";
  20. @Autowired
  21. private RabbitTemplate rabbitTemplate;
  22. @Autowired
  23. private TopicExchange topicExchange;
  24. @Autowired
  25. private AmqpAdmin amqpAdmin;
  26. @RabbitListener(queues = "email_queue")
  27. public void sendMail(Message message, Channel channel) throws IOException {
  28. try {
  29. // 睡眠1秒
  30. try {
  31. Thread.sleep(1000);
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. String s = new String(message.getBody());
  36. UserPOJO userPOJO = JSON.parseObject(s, UserPOJO.class);
  37. int a = 1 / 0; // 造成异常
  38. System.out.println("Mail系统收到 userPOJO=" + userPOJO);
  39. } catch (Exception e) {
  40. int retryCount = getRetryCount(message);
  41. System.out.println("Mail系统出现异常 当前retryCount =" + retryCount);
  42. if (retryCount < MAX_RETRY) {
  43. // 重试次数未达到最大次数,将消息重新发送到主队列,并增加重试次数
  44. MessageProperties properties = message.getMessageProperties();
  45. properties.setHeader("retry_count", retryCount + 1);
  46. rabbitTemplate.send(topicExchange.getName(), "info.mail", message);
  47. } else {
  48. // 设置队列属性
  49. Map<String, Object> arguments = new HashMap<String, Object>();
  50. // 设置队列的TTL
  51. arguments.put("x-message-ttl", 10000);
  52. arguments.put("x-dead-letter-exchange", ERR_EXCHANGE_NAME_DIRECT);// 设置死信队列的交换器名称
  53. arguments.put("x-dead-letter-routing-key", "err.mail");// 设置死信队列的路由键
  54. // 发送给TTL队列
  55. amqpAdmin.declareQueue(new Queue("TTL_email_queue", true, false, false, arguments));
  56. amqpAdmin.declareBinding(new Binding("TTL_email_queue", Binding.DestinationType.QUEUE, "死信交换机", "ttl_email", null));
  57. // 发送消息到TTL队列 此队列无消费者 在消息过期后会自动转发到配置的死信队列中去
  58. rabbitTemplate.send(ERR_EXCHANGE_NAME_DIRECT, "ttl_email", message);
  59. }
  60. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
  61. return;
  62. }
  63. // 需要消息确认ACK
  64. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 这里false表示是否开启批量应答
  65. /*
  66. 何为批量应答?
  67. 比如说channel上有传送tag的消息5,6,7,8,当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答 即批量应答
  68. */
  69. }
  70. // 死信消费
  71. @RabbitListener(queues = ERR_QUEUE_NAME)
  72. public void doFailedInformation(Message message, Channel channel) throws IOException {
  73. // 再次消费信息
  74. try {
  75. String s = new String(message.getBody());
  76. UserPOJO userPOJO = JSON.parseObject(s, UserPOJO.class);
  77. System.out.println("死信消费者消费死信队列: " + userPOJO);
  78. // 消费消息
  79. } catch (Exception e) {
  80. e.printStackTrace();
  81. // 表中字段 success修改为0
  82. // 发送一封邮件给操作人
  83. }
  84. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  85. }
  86. // 获取重试次数
  87. private int getRetryCount(Message message) {
  88. Object retryCountObj = message.getMessageProperties().getHeaders().get("retry_count");
  89. if (retryCountObj instanceof Integer) {
  90. return (Integer) retryCountObj;
  91. }
  92. return 0;
  93. }
  94. }

两个方法分别监听业务队列和死信队列,如果消息消费出现异常,则重新将消息放入队列尾部,如果重试次数达到三次则将此消息放入TTL队列中,TTL队列中的消息会根据配置的过期时间、死信交换机、以及死信交换机上的routingkey对消息进行投送,进入相应的死信队列,然后再通过死信消费者进行消费处理,此时若再次发送失败,则发送邮件提醒人员进行手工发送来确保消息的有效性。

消费者确保消息的可靠性通过下示代码进行消息的确认

  1. /*
  2. (1)channel.basicAck 用于肯定确认,RabbitMQ已经知道该消息并且成功地处理消息,可以将其丢弃了
  3. (2)channel.basicNack 用于否定确认
  4. (3)channel.basicReject 用于否定确认,与channel.basicNack相比少一个参数,不处理该消息了直接
  5. 拒绝,可以将其丢弃了
  6. */
  7. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

总结

消息发送的流程大致可化为 生产者(业务接口) ->  mq -> 交换机 -> 队列 -> 消费者

生产者消息确认机制可以确保在前半部分的有效性,消费者手动确认机制可以确保在后半部分的有效性,而一旦消息连续失败多次,我们还有保底方案通过定时任务扫描DB获取失败的消息转而通过人工发送,这样就可以在全流程上确保消息的可靠性了,这里仅仅是我个人的一套保证可靠性的方案,如果有其他更为可行的方案欢迎评论区补充

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/556975
推荐阅读
相关标签
  

闽ICP备14008679号