赞
踩
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
什么是死信队列
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
消息成为死信的三种情况:
死信的处理方式
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
① 丢弃,如果不是很重要,可以选择丢弃
② 记录死信入库,然后做后续的业务分析或处理
③ 通过死信队列,由负责监听死信的应用程序进行处理
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理
队列绑定死信交换机:
给队列设置参数:
x-dead-letter-exchange 和 x-dead-letter-routing-key
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { //死信交换机 public static final String EXCHANGE_NAME_DLX = "exchange_dlx6"; //死信队列 public static final String QUEUE_NAME_DLX = "queue_dlx6"; //交换机 public static final String EXCHANGE_NAME = "test_exchange_dlx6"; //队列 public static final String QUEUE_NAME = "test_queue_dlx6"; // 1 交换机 @Bean("test_exchange_dlx") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } // 1 死信交换机 @Bean("exchange_dlx") public Exchange dlxExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build(); } //2.Queue 队列 @Bean("test_queue_dlx") public Queue bootQueue(){ Map<String, Object> args = new HashMap<>(); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", "dlx.#"); //设置ttl args.put("x-message-ttl",10000); //最大长度为10 args.put("x-max-length",10); return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build(); } //2.死信 队列 @Bean("queue_dlx") public Queue dlxQueue(){ return QueueBuilder.durable(QUEUE_NAME_DLX).build(); } //3. 死信队列和死信交互机绑定关系 Binding /* 1. 知道哪个队列 2. 知道哪个交换机 3. routing key noargs():表示不指定参数 */ @Bean public Binding bindQueueExchange(@Qualifier("queue_dlx") Queue queue, @Qualifier("exchange_dlx") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs(); } //3. 队列和交互机绑定关系 Binding @Bean public Binding bindQueueExchange1(@Qualifier("test_queue_dlx") Queue queue, @Qualifier("test_exchange_dlx") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs(); } }
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?");
}
test开头是正常队列,不是test开头的队列是死信
超过了10秒,没有被消费就进入死信队列
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
for (int i = 0; i < 10; i++) {
//1. 测试过期时间,死信消息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?"+i);
}
}
test开头是正常队列,不是test开头的队列是死信
发队列最大只能存储10条消息,而发了11条消息,根据先进先出,最先发的消息会进入死信队列。
隔10s,没有被消费,会进入死信队列
在消费者端进行消息拒收
yml
spring:
rabbitmq:
host: 192.168.121.140
port: 5672
username: admin
password: admin
virtual-host: /
listener:
simple:
#表示手动确认
acknowledge-mode: manual
监听
拒绝签收,不重回队列 requeue=false
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; @Component public class DlxListener implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1.接收转换消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑..."); int i = 3/0;//出现错误 //3. 手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { //e.printStackTrace(); System.out.println("出现异常,拒绝接受"); //4.拒绝签收,不重回队列 requeue=false channel.basicNack(deliveryTag,true,false); } } }
修改生产者测试代码
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDlx(){
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"test.dlx.hello","我是一条消息,我会死吗?拒绝");
}
进入了死信队列
1.死信交换机和死信队列和普通的没有区别
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3.消息成为死信的三种情况:
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
场景:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行取消处理。这时就可以使用延时队列将订单信息发送到延时队列。
需求:
实现方式:
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
其实和死信队列差不多,加一个ttl时间就可以了
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfig { //死信交换机 public static final String EXCHANGE_NAME_DLX = "exchange_dlx"; //死信队列 public static final String QUEUE_NAME_DLX = "order_que_dlx"; //交换机 public static final String EXCHANGE_NAME = "test_exchange_dlx"; //队列 public static final String QUEUE_NAME = "order_queue"; // 1 交换机 @Bean("test_exchange_dlx") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } // 1 死信交换机 @Bean("exchange_dlx") public Exchange dlxExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME_DLX).durable(true).build(); } //2.Queue 队列 @Bean("test_queue_dlx") public Queue bootQueue(){ Map<String, Object> args = new HashMap<>(); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", EXCHANGE_NAME_DLX); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", "dlx.order.#"); //设置ttl args.put("x-message-ttl",10000); return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build(); } //2.死信 队列 @Bean("queue_dlx") public Queue dlxQueue(){ return QueueBuilder.durable(QUEUE_NAME_DLX).build(); } //3. 死信队列和死信交互机绑定关系 Binding /* 1. 知道哪个队列 2. 知道哪个交换机 3. routing key noargs():表示不指定参数 */ @Bean public Binding bindQueueExchange(@Qualifier("queue_dlx") Queue queue, @Qualifier("exchange_dlx") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("dlx.order.#").noargs(); } //3. 队列和交互机绑定关系 Binding @Bean public Binding bindQueueExchange1(@Qualifier("test_queue_dlx") Queue queue, @Qualifier("test_exchange_dlx") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("test.order.#").noargs(); } }
@SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testDelay() throws InterruptedException { //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息 rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "test.order.msg","订单信息:id=1,time=2022年03月30日11:41:47"); //2.打印倒计时10秒 for (int i = 10; i > 0 ; i--) { System.out.println(i+"..."); Thread.sleep(1000); } } }
运行程序创建订单延时队列
OrderListener
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.stereotype.Component; @Component public class OrderListener implements ChannelAwareMessageListener { @RabbitListener(queues = "order_que_dlx") @Override public void onMessage(Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { //1.接收转换消息 System.out.println(new String(message.getBody())); //2. 处理业务逻辑 System.out.println("处理业务逻辑..."); System.out.println("根据订单id查询其状态..."); System.out.println("判断状态是否为支付成功"); System.out.println("取消订单,回滚库存...."); //3. 手动签收 channel.basicAck(deliveryTag,true); } catch (Exception e) { //e.printStackTrace(); System.out.println("出现异常,拒绝接受"); //4.拒绝签收,不重回队列 requeue=false channel.basicNack(deliveryTag,true,false); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。