当前位置:   article > 正文

RabbitMQ之死信队列_rabbitmq 查看是否死q

rabbitmq 查看是否死q

1、死信队列的概念

  先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
  应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

2、死信的来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

3、死信实战

3.1 代码架构图

在这里插入图片描述
一个生产者,两个消费者,当消息满足死信条件的时候被送入dead-queue这个死信队列。
zhangsan是normal_exchange普通交换机和normal-queue普通队列绑定的routingKey
lisi是dead_exhange死信交换机和dead-queue死信队列绑定的routingKey

3.2 模拟消息TTL过期

生产者代码

public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息  设置TTL时间  time to live  单位是ms  10000ms=10s
        AMQP.BasicProperties properties=
                new AMQP.BasicProperties()
                .builder().expiration("10000")
                .build();

        for (int i = 1; i <11; i++) {
            String message = "info" + i;    //info1.....info10
           
         channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

消费者01:

public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机  类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        Map<String,Object> arguments=new HashMap<>();
        //过期时间   10s=10000ms
//        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置正常队列的长度的限制
//        arguments.put("x-max-length",6);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
           String msg= new String(message.getBody(),"UTF-8");
               System.out.println("Consumer01接收的消息是:"+msg);
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

消费者02:这个消费者代码最简单了,只负责接收死信队列中的消息即可。

public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
        
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

先启动消费者C1

  先建立交换机与队列的绑定关系(这些代码我们写在了消费者c1里面),然后将消费者C1关闭,这时如果启动生产者发送消息的话,由于消费者C1线程已经关闭,所以消息会全部进入死信队列中。
  我们这时启动生产者线程发送10条消息,由于消费者C1已经关闭,我们在配置中指定的是若消息被拒绝则会进入死信队列dead-queue,所以我们在启动生产者之后,消息会全部进入死信队列,我们可以通过rabbitmq的控制台查看

通过上面的图片可以看出,normal_queue队列中的10条消息已经全部进入了死信队列dead_queue

  由架构图可知,死信队列中的消息会被消费者C2消费,那我们现在启动消费者C2线程,按理说启动之后,死信队列中的10条消息都会被C2消费掉。
C2的代码:

/**
 * 死信队列 实战
 *
 * 消费者2
 */
public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:"+new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

通过下图可以看到,死信队列中的消息成功被C2消费掉,测试成功。

3.3 模拟队列达到最大长度

  消费者C2代码不变,我们现在设置普通队列normal_queue最多只能接收6条消息,生产者发送10条消息,那么最后的四条消息会被送入死信队列(这里先别让消费者C1消费,要不然消费的太快,看不到效果)。
生产者代码修改:(注释掉TTL)

public class Producer {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //死信消息  设置TTL时间  time to live  单位是ms  10000ms=10s
//        AMQP.BasicProperties properties=
//                new AMQP.BasicProperties()
//                .builder().expiration("10000")
//                .build();

        for (int i = 1; i <11; i++) {
            String message = "info" + i;    //info1.....info10
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
//            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
        
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

C1代码修改
在normal_queue队列的声明处加入以下配置

然后启动C1线程,启动之后关闭

生产者发送10条消息,启动生产者线程

  从上图中的死信队列和普通队列中的消息条数可以发现,符合实验预期,由于我们关闭了消费者C1这个线程,所以消息在进入normal_queue的时候不会被消费,但由于我们设置了它最多只能接受6条消息,所以剩下的4条信息会进入死信队列。

3.4 模拟消息被拒绝

消费者C1:(启动之后关闭,模拟其接收不到消息)

public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机  类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        Map<String,Object> arguments=new HashMap<>();
        //过期时间   10s=10000ms
//        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置正常队列的长度的限制
//        arguments.put("x-max-length",6);

        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息......");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
           String msg= new String(message.getBody(),"UTF-8");
           if(msg.equals("info5")){ //拒绝info5 让其成为死信
               System.out.println("Consumer01接收的消息是:"+msg+":此消息是被C1拒绝的");
               //false不塞回普通队列  让其成为死信
               channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
           }else{
               System.out.println("Consumer01接收的消息是:"+msg);
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
           }
        };
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

启动生产者(代码和上面的一样)
此时的消息队列

  我们这时候先启动消费者C1再启动消费者C2,我们设置的是消费者C1会拒绝info5这条消息 让其进入死信队列,那么我们启动消费者C2之后,这条被拒绝的消息就会被C2消费。
C1:

C2:

测试成功

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/1018233
推荐阅读
相关标签
  

闽ICP备14008679号