当前位置:   article > 正文

MQ死信队列

MQ死信队列

面试题:你们是如何保证消息不丢失的?

1、什么是死信

RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。

死信就是消息在特定场景下的一种表现形式,这些场景包括:

1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject

2. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack

3. 消息的Expiration 过期时长或队列TTL过期时间。

4. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。 

2、什么是死信队列

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。

死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预

3、那么,我们到底如何来使用死信队列呢? 

死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。

  1. public class DeadConsumer {
  2. //死信交换机
  3. @Bean
  4. public DirectExchange deadExchange(){
  5. return ExchangeBuilder.directExchange("Dead_E01").build();
  6. }
  7. //死信队列
  8. @Bean
  9. public Queue deadQueue1(){
  10. return QueueBuilder.durable("Dead_Q01").build();
  11. }
  12. //死信交换机与死信队列的绑定
  13. @Bean
  14. public Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){
  15. return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
  16. }
  17. //业务队列
  18. @Bean
  19. public Queue queue1(){
  20. return QueueBuilder
  21. .durable("Direct_Q01")
  22. .deadLetterExchange("Dead_E01")
  23. .deadLetterRoutingKey("RK_DEAD")
  24. //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
  25. //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
  26. .build();
  27. }
  28. //业务交换机
  29. @Bean
  30. public DirectExchange exchange(){
  31. return ExchangeBuilder.directExchange("Direct_E01").build();
  32. }
  33. //业务交换机与队列的绑定
  34. @Bean
  35. public Binding binding1(Queue queue1,DirectExchange exchange){
  36. return BindingBuilder.bind(queue1).to(exchange).with("RK01");
  37. }
  38. //@RabbitListener(queues = "Direct_Q01")
  39. public void receiveMessage(OrderingOk msg) {
  40. log.info("消费者1 收到消息:"+ msg );
  41. int i= 5/0;
  42. }
  43. // @RabbitListener(queues = "Direct_Q01")
  44. // public void receiveMessage(OrderingOk msg,Message message, Channel channel) throws IOException {
  45. //
  46. // long deliveryTag = message.getMessageProperties().getDeliveryTag();
  47. //
  48. // System.out.println("消费者1 收到消息:"+ msg +" tag:"+deliveryTag);
  49. //
  50. // channel.basicReject(deliveryTag, false);
  51. // try {
  52. // // 处理消息...
  53. // int i= 5/0;
  54. // // 如果处理成功,手动发送ack确认 ,Yes
  55. // channel.basicAck(deliveryTag, false);
  56. // } catch (Exception e) {
  57. // // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject) NO
  58. // channel.basicNack(deliveryTag, false, false); // 并重新入队
  59. //
  60. // }
  61. }
  62. //}

4. 自动应答死信配置

  1. #-------------MQ 高级配置---------
  2. #预抓取数量
  3. spring.rabbitmq.listener.simple.prefetch=50
  4. #设置消费者手动应答模式
  5. spring.rabbitmq.listener.simple.acknowledge-mode = auto
  6. #开启自动应答重试机制
  7. spring.rabbitmq.listener.simple.retry.enabled=true
  8. #默认重试3次
  9. spring.rabbitmq.listener.simple.retry.max-attempts=3
  10. #重试间隔时间 单位ms
  11. spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
  12. #时间间隔倍数,默认是1倍
  13. spring.rabbitmq.listener.simple.retry.multiplier=2
  14. #最大间隔时间
  15. spring.rabbitmq.listener.simple.retry.max-interval=5000ms

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/406778
推荐阅读
相关标签
  

闽ICP备14008679号