赞
踩
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;
以上是个人的通俗解释,专业术语解释的比较正规点大家可以参考,主要想搞清楚这个概念,不同的消息中间件大概都有自身对于死信或者死信队列的处理方式,下面重点要说说。
消息变成死信有以下几种情况
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,
综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理。
演示死信队列处理,这里设定产生死信的场景是设置队列中的消息有效期为10s,超出时间未被消费者接收,就会自动添加到死信队列,消费者端监听死信队列,并进行业务处理。
演示环境
RabbitMQ3.8.5+Spring Boot 2.3.0 RELEASE+JAVA 8
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- rabbitmq:
- host: localhost
- port: 5673
- username: guest
- password: guest
- # 开启消息确认机制
- publisher-confirm-type: correlated
- # 开启消息发送到队列失败返回
- publisher-returns: true
- package com.tdrc.common.core.rabbitmq;
-
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * @author dpf
- * @version 1.0
- * @date 2020-6-22 9:52
- * @instruction ...
- */
- @Configuration
- public class RabbitExChangeConfig {
- /**
- * 业务交换机
- */
- public static final String DESTINATION_NAME = "rabbitMq_direct";
- /**
- * 业务队列名称
- */
- public static final String SMS_QUEUE = "Sms_msg";
- /**
- * 死信队列交换机名称
- */
- public static final String DEAD_LETTER_EXCHANGE_NAME="deadLetter_direct";
- /**
- * 死信队列名称
- */
- public static final String DEAD_LETTER_QUEUE = "deadLetter_queue";
- /**
- * RouteKey
- */
- public static final String SMS_ROUTING_KEY = "sms";
- /**
- * 配置死信交换机
- * @return
- */
- @Bean
- public DirectExchange deadLetterDirectExchange(){
- return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
- }
- /**
- * 配置死信队列
- * @return
- */
- @Bean
- public Queue deadLetterQueue(){
- return new Queue(DEAD_LETTER_QUEUE);
- }
- /**
- * 绑定死信队列和死信交换机
- * @return
- */
- @Bean
- Binding deadLetterBindingDirect() {
- return BindingBuilder.bind(deadLetterQueue()).to(deadLetterDirectExchange()).with(SMS_ROUTING_KEY);
- }
- /**
- * 配置队列
- * @return
- */
- @Bean
- public Queue smsDirectQueue() {
- Map<String, Object> args = new HashMap<>(16);
- // 队列消息过期时间
- args.put("x-message-ttl", 10000);
- args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
- args.put("x-dead-letter-routing-key", SMS_ROUTING_KEY);
- // args.put("x-expires", 5000);队列过期时间
- // args.put("x-max-length",5 );
- return new Queue(SMS_QUEUE, true,false,false,args);
- }
- /**
- * 配置交换机
- * @return
- */
- @Bean
- public DirectExchange directExchange() {
- return new DirectExchange(DESTINATION_NAME);
- }
-
- /**
- * 交换机与队列绑定
- * @return
- */
- @Bean
- Binding smsBindingDirect() {
- return BindingBuilder.bind(smsDirectQueue()).to(directExchange()).with(SMS_ROUTING_KEY);
- }
-
-
- @Bean
- public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory =
- new SimpleRabbitListenerContainerFactory();
- //这个connectionFactory就是我们自己配置的连接工厂直接注入进来
- simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
- //这边设置消息确认方式由自动确认变为手动确认
- simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- //设置消息预取数量
- // simpleRabbitListenerContainerFactory.setPrefetchCount(1);
- return simpleRabbitListenerContainerFactory;
- }
- /**
- * 每个rabbitTemplate方法只可以有一个回调,不然会报错 only one ConfirmCallback is supported by each RabbitTemplate,解决办法是配成多利的
- *
- * @param connectionFactory
- * @return
- */
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate template = new RabbitTemplate(connectionFactory);
- //成功回调
- template.setConfirmCallback(new Callback());
- // 开启mandatory模式(开启失败回调)
- template.setMandatory(true);
- //失败回调
- template.setReturnCallback(new Callback());
-
- return template;
- }
- }
- @Resource
- private RabbitTemplate rabbitTemplate;
-
- @GetMapping("/sendSms")
- private void sendSms() throws InterruptedException {
- String msg = "HelloWorld rabbitmq";
- for(Integer i=0;i<10;i++){
- CorrelationData correlationData = new CorrelationData(i.toString());
- rabbitTemplate.convertAndSend(RabbitExChangeConfig.DESTINATION_NAME, RabbitExChangeConfig.SMS_ROUTING_KEY, msg+i ,correlationData);
- }
-
- }
- @GetMapping("/sendSms")
- private void sendSms() throws InterruptedException {
- String msg = "HelloWorld rabbitmq";
- for(Integer i=0;i<10;i++){
- CorrelationData correlationData = new CorrelationData(i.toString());
- rabbitTemplate.convertAndSend(RabbitExChangeConfig.DESTINATION_NAME, RabbitExChangeConfig.SMS_ROUTING_KEY, msg+i ,correlationData);
- }
-
- }
启动程序前消息对列中无程序内创建的业务队列和死信队列。
消息发送后会产生两个交换机和两个队列,一个队列是Sms_msg,一个是deadLetter_queue,消息记录为10条
10s后的队列结果如图,由于生产端发送消息时指定了消息的过期时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。
程序中添加死信队列消费者监控代码,重新启动程序
- @RabbitListener(queues = RabbitExChangeConfig.DEAD_LETTER_QUEUE, containerFactory = "simpleRabbitListenerContainerFactory")
- public void reciveDeadLetter(Message message, Channel channel, @Headers Map<String, Object> headers) throws IOException {
- long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
- try {
-
- System.out.println("死信队列消费者收到消息 : " + new String(message.getBody(), "UTF-8"));
- /**
- * 手动ack
- * deliveryTag:该消息的index
- * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
- */
- channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- //消息退回 (可以在可视化界面看到)
- //批量退回 退回之后重回消息队列 true false的话就是丢弃这条信息,如果配置了死信队列,那这条消息会进入死信队列
- channel.basicNack(deliveryTag, false, true);
- //单条退回 channel.basicReject();
- }
- }
程序启动后消费端接收到死信对列里的信息
测试查看死信消息队列中没有需要接收的消息。
实际环境我们还需要对死信队列进行一个监听和处理,当然具体的处理逻辑和业务相关,这里只是简单演示死信队列是否生效。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。