当前位置:   article > 正文

RabbitMQ中死信队列_每一个队列都定义自己的死信队列还是进入同一个死信队列

每一个队列都定义自己的死信队列还是进入同一个死信队列
消息变成死信无非是以下几种情况:
  • 消息被拒绝签收(Nack),并且不允许重回队列。
  • TTL设定的消息有效时间过期。
  • 实际消息数大于队列最大限制数,那么超出最大限制的消息都将会是死信。
那么,出现了死信,应该如何处理呢?

其实,我们可以把这些死信放到自定义的死信队列里去。

什么叫做死信队列?

“死信队列”,顾名思义,就是存放死信的队列。其实它和普通的队列并没有太大差别,唯一的区别就是他的routingkey是"#"。也就是说:只要你路由到我这个死信队列,我都接收。

如何定义死信队列?
生产端:
package com.wy.testrabbitmq.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author wangyan@163.com
 * @version 1.0
 * @date 2019-06-12 17:20
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_dlx_exchange";
        String routingkey = "dlx.dlx";
        String msg = "test dlx message";
        for (int i = 0; i < 3; i++) {
            // deliveryMode=2 持久化,expiration 消息有效时间
            AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("utf-8")
                    .expiration("7000")
                    .build();
            channel.basicPublish(exchangeName, routingkey, true, properties, msg.getBytes());
        }

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
消费端:(主要处理在消费端)
package com.wy.testrabbitmq.dlx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @author wangyan@163.com
 * @version 1.0
 * @date 2019-06-12 17:21
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_dlx_exchange";
        String routingkey = "dlx.#";
        String queueName = "test_dlx_queueName";

        Map<String,Object>map =new HashMap<>();
        //注意: x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
        map.put("x-dead-letter-exchange","dlx.exchange");
        //声明队列
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
        channel.queueDeclare(queueName, true, false, false, map);
        channel.queueBind(queueName, exchangeName, routingkey);

        //死信队列声明
        channel.exchangeDeclare("dlx.exchange","topic",true,false,null);
        channel.queueDeclare("dlx.queue",true,false,false,null);
        //routingkey指定为#就行,表示只要路由到死信队列的都接收
        channel.queueBind("dlx.queue","dlx.exchange","#");

        channel.basicConsume(queueName, true, new TestConsumer(channel));
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
测试:

如上代码中,我定义了消息的有效时间为7秒。按照逻辑,推送的消息超过7秒没有被消费,就会变成死信。下面我只启动生产端,看管控台变化。

如下,刚启动时我们可以看到我的test_dlx_queueName中有3条消息。
在这里插入图片描述
等待7秒后如下:
在这里插入图片描述
我们可以看到,由于超过了TTL的有效时间,这3条消息变成了死信,被自动转移到了我们自定义的dlx.queue当中。

注意这些地方:
  • //注意: x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
    Map<String,Object>map =new HashMap<>();
    map.put(“x-dead-letter-exchange”,“dlx.exchange”);

  • //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
    channel.queueDeclare(queueName, true, false, false, map);

  • // routingkey指定为#就行,表示只要路由到死信队列的都接收
    channel.queueBind(“dlx.queue”,“dlx.exchange”,“#”);

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/930118
推荐阅读
相关标签
  

闽ICP备14008679号