当前位置:   article > 正文

如何保证消息的可靠性+延迟队列(TTL+死信队列+延迟队列)_springboot使用死信队列保证消息可靠性

springboot使用死信队列保证消息可靠性

目录

1.如何保证消息的可靠性

1.1.消息的可靠投递

confirm机制

return机制

1.2.如何保证消息在队列中不丢失

1.3.确保消息能可靠的被消费掉

2.延迟队列

2.1.TTL

2.2.死信队列

2.3.延迟队列

3.如何防止消费者重复消费消息


1.如何保证消息的可靠性

1.1.消息的可靠投递

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式
  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。
  • 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。

默认rabbitmq不开启上面两种模式

我们将利用这两个 callback 控制消息的可靠性投递

confirm和return的实现  

  1. 设置ConnectionFactory的publisher-confirm-type: correlated开启 确认模式。

  2. 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理。

  3. 设置ConnectionFactory的publisher-returns="true" 开启 退回模式。

  4. 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后执行回调函数returnedMessage。

confirm机制

演示:4.springboot整合RabbitMQ

(1).配置文件中开启confirm

  1. #开启confirm确认机制
  2. spring.rabbitmq.publisher-confirm-type=correlated

(2)设置rabbitTemplate的confirmCallback回调函数

  1. /**
  2. * 使用confirm机制:
  3. * (1)需要开启confirm机制。-----配置文件中加入:spring.rabbitmq.publisher-confirm-type=correlated
  4. * (2)为rabbitTemplate指定setConfirmCallback回调函数
  5. */
  6. @Test
  7. void test001() {
  8. //只能保证消息从生产者到交换机的可靠性
  9. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  10. @Override
  11. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  12. //触发该方法
  13. if (ack == false) {
  14. System.out.println("未来根据项目的需要,完成相应的操作");
  15. }
  16. }
  17. });
  18. rabbitTemplate.convertAndSend("Topics-exchange", "lazy.aaa", "Hello RabbitMQ...");
  19. }

return机制

(1).配置文件中开启return

  1. #开启return机制
  2. spring.rabbitmq.publisher-returns=true

(2)设置rabbitTemplate的return回调函数

  1. /**
  2. * return机制:
  3. * (1)开启return机制---配置文件加入:spring.rabbitmq.publisher-returns=true
  4. * (2)为rabbitTemplate设置return的回调函数
  5. */
  6. @Test
  7. void test002() {
  8. //只有当消息无法从交换机到队列时才会触发
  9. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  10. @Override
  11. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  12. //为交换机到队列分发消息失败时会触发
  13. System.out.println("replyCode====" + replyCode);
  14. System.out.println("replyText====" + replyText);
  15. }
  16. });
  17. rabbitTemplate.convertAndSend("Topics-exchange", "lazy.aaa", "Hello RabbitMQ...");
  18. }

1.2.如何保证消息在队列中不丢失

(1)设置队列为持久化

(2)设置消息的持久化

1.3.确保消息能可靠的被消费掉

ACK确认机制

多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完要通知服务端,服务端才将数据删除

这样就解决了,即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

ACK的实现

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge="none"
  • 手动确认:acknowledge="manual"
  • 根据异常情况确认:acknowledge="auto"(这种方式使用麻烦,并且不常用)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息队列中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

消费端:

(1).修改消费端----手动确认消息

  1. #修改消费端----手动确认消息
  2. spring.rabbitmq.listener.simple.acknowledge-mode=manual

(2).修改代码

  

  1. package com.wqg.listener;
  2. import com.rabbitmq.client.Channel;
  3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.amqp.core.Message;
  6. import java.io.IOException;
  7. /**
  8. * @ fileName:MyListener
  9. * @ description:
  10. * @ author:wqg
  11. * @ createTime:2023/7/12 18:57
  12. */
  13. @Component
  14. public class MyListener {
  15. //basicAck:确认消息----rabbit服务端删除
  16. //basicNack:服务继续发送消息
  17. @RabbitListener(queues = {"Topics-queue002"})//queues:表示你监听的队列名
  18. public void h(Message message , Channel channel) throws IOException {
  19. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  20. //把监听到的消息封装到Message类对象中
  21. byte[] body = message.getBody();
  22. String s = new String(body);
  23. System.out.println("消息内容==="+s);
  24. try {
  25. //int a = 10/0; //模拟宕机
  26. System.out.println("核心业务的处理~~~");
  27. /**
  28. *long deliveryTag: 消息的标注
  29. * boolean multiple:是否把该消息之前未确认的消息一起确认掉
  30. */
  31. channel.basicAck(deliveryTag,true);//确认消息
  32. } catch (Exception e) {
  33. /**
  34. * long deliveryTag; boolean multiple;
  35. * boolean requeue:是否要求rabbitmq服务重新发送该消息
  36. */
  37. channel.basicNack(deliveryTag,true,true);
  38. }
  39. }
  40. }

总结: 如何保证消息的可靠性?

  • 保证消息的可靠性投递: confirm机制和return机制
  • 队列中:---持久化
  • 使用ack机制保证消费者的可靠性消费。

2.延迟队列

2.1.TTL

TTL 全称 Time To Live(存活时间/过期时间)。

当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

演示:

使用图形化界面创建

 生产者测试

 

 结果

 

队列设置了过期时间而消息也设置了过期时间-----按照时间短的执行 

小结:

  • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
  • 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。
  • 如果两者都进行了设置,以时间短的为准。

2.2.死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

什么样的消息会成为死信消息:

  • 队列消息长度到达限制
  • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false
  • 原队列存在消息过期设置,消息到达超时时间未被消费

队列绑定死信交换机:

演示:

使用图形化界面创建:

 

 

 

 生产者测试

 结果

 

2.3.延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器:性能差---每隔一段时间要进行数据库查询。

  2. 延迟队列

通过消息队列完成延迟队列的功能:

  • 在RabbitMQ中并未提供延迟队列功能。
  • 但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

 

演示:

队列为空

 

创建springboot项目

配置文件

  1. #rabbitmq的配置
  2. spring.rabbitmq.host=192.168.75.129
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. spring.rabbitmq.virtual-host=/

(1). 引入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.alibaba</groupId>
  4. <artifactId>fastjson</artifactId>
  5. <version>1.2.83</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.springframework.boot</groupId>
  9. <artifactId>spring-boot-starter-amqp</artifactId>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-web</artifactId>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.projectlombok</groupId>
  17. <artifactId>lombok</artifactId>
  18. <optional>true</optional>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-test</artifactId>
  23. <scope>test</scope>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.amqp</groupId>
  27. <artifactId>spring-rabbit-test</artifactId>
  28. <scope>test</scope>
  29. </dependency>
  30. </dependencies>

(2).创建OrderController.java模拟订单

  1. package com.wqg.controller;
  2. import com.alibaba.fastjson.JSON;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.GetMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import java.util.HashMap;
  9. import java.util.UUID;
  10. /**
  11. * @ fileName:OrderController
  12. * @ description:订单
  13. * @ author:wqg
  14. * @ createTime:2023/7/13 16:24
  15. */
  16. @RestController
  17. @RequestMapping("/order")
  18. public class OrderController {
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. @GetMapping("/saveOrder")
  22. public String save(Integer pid, Integer num) {
  23. //生成一个订单号
  24. String orderId = UUID.randomUUID().toString().replace("-", "");
  25. System.out.println("下单成功,订单号为===" + orderId);
  26. HashMap<Object, Object> map = new HashMap<>();
  27. map.put("orderId", orderId);
  28. map.put("pid", pid);
  29. map.put("num", num);
  30. rabbitTemplate.convertAndSend("pt_exchange", "qy165.aaa", JSON.toJSONString(map));
  31. return "下单成功";
  32. }
  33. }

(3).创建MyListener.java模拟监听

  1. package com.wqg.listener;
  2. import com.alibaba.fastjson.JSON;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.util.HashMap;
  7. /**
  8. * @ fileName:MyListener
  9. * @ description:
  10. * @ author:wqg
  11. * @ createTime:2023/7/13 16:35
  12. */
  13. @Component
  14. public class MyListener {
  15. @RabbitListener(queues = {"dead_queue"})
  16. public void hello(Message message){
  17. byte[] body = message.getBody();
  18. String s = new String(body);
  19. HashMap hashMap = JSON.parseObject(s, HashMap.class);
  20. System.out.println("message==="+hashMap);
  21. //取消订单
  22. System.out.println("取消订单号为==="+hashMap.get("orderId"));
  23. }
  24. }

测试

控制台

 

 

3.如何防止消费者重复消费消息

消息的幂等性---无论操作几次结果都是一样。

  • 生成全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有消费过。
  • 如果该消息已经消费过,则告诉mq消息已经消费,将该消息丢弃(手动ack)。
  • 如果没有消费过,将该消息进行消费并将消费记录写进redis或者数据库中。

 简单描述一下需求,如果订单完成之后,需要为用户累加积分,又需要保证积分不会重复累加。那么再mq消费消息之前,先去数据库查询该消息是否已经消费,如果已经消费那么直接丢弃消息。 

演示:

生产者

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.xiaojie.score.entity.Score;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.connection.CorrelationData;
  5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.scheduling.annotation.Async;
  8. import org.springframework.stereotype.Component;
  9. import java.util.UUID;
  10. /**
  11. * @author
  12. * @version 1.0
  13. * @description:发送积分消息的生产者
  14. * @date
  15. */
  16. @Component
  17. @Slf4j
  18. public class ScoreProducer implements RabbitTemplate.ConfirmCallback {
  19. @Autowired
  20. private RabbitTemplate rabbitTemplate;
  21. //定义交换机
  22. private static final String SCORE_EXCHANGE = "ykq_score_exchaneg";
  23. //定义路由键
  24. private static final String SCORE_ROUTINNGKEY = "score.add";
  25. /**
  26. * @description: 订单完成
  27. * @param:
  28. * @return: java.lang.String
  29. * @author xiaojie
  30. * @date:
  31. */
  32. public String completeOrder() {
  33. String orderId = UUID.randomUUID().toString();
  34. System.out.println("订单已完成");
  35. //发送积分通知
  36. Score score = new Score();
  37. score.setScore(100);
  38. score.setOrderId(orderId);
  39. String jsonMSg = JSONObject.toJSONString(score);
  40. sendScoreMsg(jsonMSg, orderId);
  41. return orderId;
  42. }
  43. /**
  44. * @description: 发送积分消息
  45. * @param:
  46. * @param: message
  47. * @param: orderId
  48. * @return: void
  49. * @author
  50. * @date:
  51. */
  52. @Async
  53. public void sendScoreMsg(String jsonMSg, String orderId) {
  54. this.rabbitTemplate.setConfirmCallback(this);
  55. rabbitTemplate.convertAndSend(SCORE_EXCHANGE, SCORE_ROUTINNGKEY, jsonMSg, message -> {
  56. //设置消息的id为唯一
  57. message.getMessageProperties().setMessageId(orderId);
  58. return message;
  59. });
  60. }
  61. @Override
  62. public void confirm(CorrelationData correlationData, boolean ack, String s) {
  63. if (ack) {
  64. log.info(">>>>>>>>消息发送成功:correlationData:{},ack:{},s:{}", correlationData, ack, s);
  65. } else {
  66. log.info(">>>>>>>消息发送失败{}", ack);
  67. }
  68. }
  69. }

消费者

  1. import com.alibaba.fastjson.JSONObject;
  2. import com.rabbitmq.client.Channel;
  3. import com.xiaojie.score.entity.Score;
  4. import com.xiaojie.score.mapper.ScoreMapper;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.messaging.handler.annotation.Headers;
  11. import org.springframework.stereotype.Component;
  12. import java.io.IOException;
  13. import java.util.Map;
  14. /**
  15. * @author
  16. * @version 1.0
  17. * @description: 积分的消费者
  18. * @date
  19. */
  20. @Component
  21. @Slf4j
  22. public class ScoreConsumer {
  23. @Autowired
  24. private ScoreMapper scoreMapper;
  25. @RabbitListener(queues = {"ykq_score_queue"})
  26. public void onMessage(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
  27. String orderId = message.getMessageProperties().getMessageId();
  28. if (StringUtils.isBlank(orderId)) {
  29. return;
  30. }
  31. log.info(">>>>>>>>消息id是:{}", orderId);
  32. String msg = new String(message.getBody());
  33. Score score = JSONObject.parseObject(msg, Score.class);
  34. if (score == null) {
  35. return;
  36. }
  37. //执行前去数据库查询,是否存在该数据,存在说明已经消费成功,不存在就去添加数据,添加成功丢弃消息
  38. Score dbScore = scoreMapper.selectByOrderId(orderId);
  39. if (dbScore != null) {
  40. //证明已经消费消息,告诉mq已经消费,丢弃消息
  41. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  42. return;
  43. }
  44. Integer result = scoreMapper.save(score);
  45. if (result > 0) {
  46. //积分已经累加,删除消息
  47. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  48. return;
  49. } else {
  50. log.info("消费失败,采取相应的人工补偿");
  51. }
  52. }
  53. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/556984
推荐阅读
相关标签
  

闽ICP备14008679号