当前位置:   article > 正文

RabbitMQ死信队列详解和使用_死信队列中的消息如何处理

死信队列中的消息如何处理

死信队列

死信,在官网中对应的单词为“Dead Letter”

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  • 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的TTL时间。
  • 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

死信的处理方式

一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列。
死信的处理方式大致有以下几种:

  • 丢失,如果消息不重要
  • 记录进入死信队列,然后做后续的业务分析或者处理
  • 通过死信队列,有负责监听死信的应用程序进行处理

配置死信队列

工具类:

public class RabbitmqUtil {
    public static Channel getChannel() throws IOException, TimeoutException {
        //通过连接工厂创建新的连接和mq建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
方式1:消息超时进入死信队列

场景:用户下的订单消息,设置过期时间后放入队列,超过时间消息还未被消费,则放入死信队列

代码演示,生产者:

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();

        // 正常队列
        String orderExchangeName = "order_exchange";
        String orderQueueName = "order_queue";
        String orderRoutingKey = "order.#";
        Map<String, Object> arguments = new HashMap<String, Object>(16);

        // 死信队列
        String dlxExchangeName = "dlx.exchange";
        String dlxQueueName = "dlx.queue";
        String dlxRoutingKey = "#";

        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange", dlxExchangeName);
        // 设置队列中的消息 10s 钟后过期
        arguments.put("x-message-ttl", 10000);

        // 创建正常交换器和队列
        // 第三个参数表示持久化交换机
        // 第四个参数表示交换机不再使用,不自动删除交换机
        channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
        // 第二个参数表示持久化队列
        // 第三个参数表示该消息队列是否只在当前connection有效,默认是false
        // 第四个参数表示队列没在使用时是否自动删除
        // 第五个参数是其他属性,设置死信队列
        channel.queueDeclare(orderQueueName, true, false, false, arguments);
        channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);

        String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单.";
        channel.basicPublish(orderExchangeName, "order.save", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        System.out.println("发送消息:" + message);
        
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

消费者:

public class Comsumer {
    private static final String QUEUE_NAME = "dlx.queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitmqUtil.getChannel();

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("死信队列收到消息:"+new String(body));
                System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                // 第二个参数:如果为true,确认之前收到的消息,如果为false,确认当前收到的消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
        TimeUnit.SECONDS.sleep(10000000L);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

在这里插入图片描述

方式2:消息被拒绝,且requeue=false

场景,消费之过滤某些消息

代码演示,生产者:

public class Producer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();

        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.save";

        //通过在properties设置来标识消息的相关属性
        for(int i=0;i<5;i++){
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num",i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)                    // 传送方式 2:持久化投递
                    .contentEncoding("UTF-8")           // 编码方式
                    //.expiration("10000")              // 过期时间
                    .headers(headers)                  //自定义属性
                    .build();
            String message = "hello this is ack message ....."  + i;
            System.out.println("发送消息:" + message);
            channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes());
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

消费者:

public class Comsumer2 {
    public static void main(String[] args) throws Exception{

        final Channel channel = RabbitmqUtil.getChannel();

        String exchangeName = "test_ack_exchange";
        String exchangeType="topic";
        final String queueName = "test_ack_queue";
        String routingKey = "ack.#";

        //死信队列配置  ----------------
        String deadExchangeName = "dead_exchange";
        String deadQueueName = "dead_queue";
        String deadRoutingKey = "#";
        //死信队列配置  ----------------

        //如果需要将死信消息路由
        Map<String,Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange",deadExchangeName);

        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        channel.queueDeclare(queueName,false,false,false,arguments);
        channel.queueBind(queueName,exchangeName,routingKey);

        //死信队列绑定配置  ----------------
        channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null);
        channel.queueDeclare(deadQueueName,true,false,false,null);
        channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey);
        //死信队列绑定配置  ----------------

        System.out.println("consumer启动 .....");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try{
                    Thread.sleep(2000);
                }catch (Exception e){

                }
                Integer num = (Integer)properties.getHeaders().get("num");
                if(num==0){
                    // 第二个参数:true,表示处理消息失败,将消息重新放回队列,如果抛异常或nack(并且第三个参数是true),消息会重写入队列
                    // 第三个参数:true,表示把消费失败的消息重新放入队列的尾部,false不会回到队列
                    channel.basicNack(envelope.getDeliveryTag(),false,false);
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer端的Nack消息是:" + message);
                }else {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer端的ack消息是:" + message);
                }
            }
        };
        //消息要能重回队列,需要设置autoAck的属性为false,即在回调函数中进行手动签收
        channel.basicConsume(queueName,false, consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

生产者:
在这里插入图片描述
消费者:
在这里插入图片描述
死信队列中有一条消息:
在这里插入图片描述

方式3:队列达到最大长度

代码演示,生产者:

注:这里和方式1类似,并且用的队列名称和方式1相同,由于修改了队列的属性,需要删除旧的队列,因为队列一旦建立好,就不能修改

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitmqUtil.getChannel();

        // 正常队列
        String orderExchangeName = "order_exchange";
        String orderQueueName = "order_queue";
        String orderRoutingKey = "order.#";
        Map<String, Object> arguments = new HashMap<String, Object>(16);

        // 死信队列
        String dlxExchangeName = "dlx.exchange";
        String dlxQueueName = "dlx.queue";
        String dlxRoutingKey = "#";

        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange", dlxExchangeName);
        // 设置队列的最大长度
        arguments.put("x-max-length",3);

        // 创建正常交换器和队列
        // 第三个参数表示持久化交换机
        // 第四个参数表示交换机不再使用,不自动删除交换机
        channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
        // 第二个参数表示持久化队列
        // 第三个参数表示该消息队列是否只在当前connection有效,默认是false
        // 第四个参数表示队列没在使用时是否自动删除
        channel.queueDeclare(orderQueueName, true, false, false, arguments);
        channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);

        String message;
        for(int i = 0; i < 5; i++) {
            message = "message-" + i;
            System.out.println("发送消息:" + message);
            channel.basicPublish(orderExchangeName, "order.save", null, message.getBytes());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

在这里插入图片描述
消费者:

public class Comsumer {
    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitmqUtil.getChannel();

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("死信队列收到消息:"+new String(body));
                System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                // 第二个参数:如果为true,确认之前收到的消息,如果为false,确认当前收到的消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false, consumer);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/569690
推荐阅读
相关标签
  

闽ICP备14008679号