当前位置:   article > 正文

RabbitMQ之死信交换机

RabbitMQ之死信交换机

前言

消息队列是分布式系统中常用的组件,用于异步通信、解耦和提高系统可靠性。然而,在实际应用中,难免会遇到一些异常情况,例如消息处理失败、超时等。为了更好地处理这些异常情况,死信交换机(Dead Letter Exchange)应运而生

一.什么是死信?

在了解死信交换机之前我们先了解什么是死信

消息变成死信一般是由于以下几种情况:

1. 重试次数超限: 消息在处理过程中多次重试仍然失败,达到预定的重试次数上限;
2. 消息被拒绝: Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false
3.消息过期:消息在队列中等待时间过长,超过了设置的过期时间
4. 队列满: 当消息队列的长度达到上限时,新的消息可能成为死信。

二.什么是死信交换机?

死信交换机是消息队列系统中的一种特殊交换机,用于处理那些无法被正常消费的消息。当消息满足一定的条件,例如重试次数达到上限或者处理失败,就会被标记为死信(Dead Letter)并被发送到死信交换机

1.死信交换机工作原理

1).消息被标记为死信:

当消息无法被正常消费时,可以通过设定一些条件将其标记为死信。这些条件可能包括消息的重试次数、过期时间等。

(2).发送到死信交换机:

一旦消息被标记为死信,它将被发送到预先指定的死信交换机。

(3).重新处理或记录:

死信交换机可以将死信消息重新发送到其他队列进行处理,也可以将其记录到日志中供后续分析。

三.优势与应用场景 

1.优势

  • 错误隔离: 死信交换机可以将异常消息隔离,避免影响正常消息的处理。
  • 重试机制: 通过重新发送死信消息,可以实现简单的重试机制,提高消息的可靠性。
  • 异常处理: 将死信记录到日志中,有助于系统运维人员进行故障排查。

2.应用场景

应用场景:

  • 业务异常: 处理业务逻辑失败的消息,例如订单支付失败。
  • 超时处理: 处理处理时间超过预定阈值的消息,避免长时间占用资源。
  • 重试机制: 对于多次重试后仍然无法处理的消息,标记为死信,进行后续处理。

四.实战与应用

1.时间过期进入到死信队列

创建绑定死信延迟队列

  1. //死信、延迟队列
  2. @Bean
  3. public Queue queueA() {
  4. Map<String, Object> config = new HashMap<>();
  5. //message在该队列queue的存活时间最大为10秒
  6. config.put("x-message-ttl", 10000);
  7. //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
  8. config.put("x-dead-letter-exchange", "ExchangeB");
  9. //x-dead-letter-routing-key参数是给这个DLX指定路由键
  10. config.put("x-dead-letter-routing-key", "bb");
  11. return new Queue("queueA", true, true, false, config);
  12. }
  13. @Bean
  14. public DirectExchange ExchangeA(){
  15. return new DirectExchange("ExchangeA");
  16. }
  17. @Bean
  18. public Binding bindingA(){
  19. return BindingBuilder.bind(queueA()).to(ExchangeA()).with("aa");
  20. }
  21. @Bean
  22. public Queue queueB() {
  23. return new Queue("queueB");
  24. }
  25. @Bean
  26. public DirectExchange ExchangeB(){
  27. return new DirectExchange("ExchangeB");
  28. }
  29. @Bean
  30. public Binding bindingB(){
  31. return BindingBuilder.bind(queueB()).to(ExchangeB()).with("bb");
  32. }

注意点:

创建一个名为 "queueA" 的队列。

通过配置Map设置了一些队列的属性:

x-message-ttl 表示消息在队列中的最大存活时间,这里设置为10秒。

x-dead-letter-exchange 表示该队列的死信交换器(DLX),即当消息在队列中过期或被拒绝时,将被发送到指定的交换器。

x-dead-letter-routing-key 表示消息被发送到死信交换器时的路由键。

小结:

这样的配置中,queueA 是一个具有延迟特性的队列,当消息在该队列中存活时间超过设定的10秒时,消息会被发送到名为 "ExchangeB" 的死信交换器,并且使用路由键 "bb"。而 queueB 则是处理死信消息的队列,它接收来自 "ExchangeB" 交换器的消息

我们通过测试方法send6向队列queueA中发送一条消息,接收成功后在时间过期时进入死信队列queueB中 

 

2.手动确认是否接收

配置确认模式为手动

  1. spring:
  2. rabbitmq:
  3. listener:
  4. simple:
  5. acknowledge-mode: manual

编写消费者接收方式为手动,若为true,则为接收,false则为不接收,不接收的同时会进入到死信队列

  1. package com.yu.consumer;
  2. import com.rabbitmq.client.Channel;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  5. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  6. import org.springframework.amqp.support.AmqpHeaders;
  7. import org.springframework.messaging.handler.annotation.Header;
  8. import org.springframework.stereotype.Component;
  9. @Component
  10. @SuppressWarnings("all")
  11. @Slf4j
  12. @RabbitListener(queues = "queueA")
  13. public class ReceiverQA {
  14. @RabbitHandler
  15. public void process(String id, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
  16. log.error("QA接收到:" + id);
  17. // channel.basicAck(tag,true);
  18. channel.basicReject(tag,true);
  19. Thread.sleep(1000);
  20. }
  21. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/87205
推荐阅读
相关标签
  

闽ICP备14008679号