当前位置:   article > 正文

RabbitMQ认知篇 - 优先级队列_rabbitmq 优先级

rabbitmq 优先级

优先级队列

RabbitMQ在3.5.0版本的时候提供了优先级队列的实现。客户端通过配置队列的x-max-priority参数的方式设置一个队列支持的最大优先级(但是不能使用策略的方式配置)以此来声明一个优先级队列。优先级最大值为255、最小值为0(默认值),推荐1~10。生产者可以通过设置Basic.Properties的priority属性设置消息的优先级(值越大,优先级越高)。优先级越高,越先被消费者消费,但是带来的内存、磁盘、CPU开销越高。

如果消费者的消费速度远低于生产者生产消息的速度、Broker有消息积压的情况下,对消息设置的优先级才有意义。

public class TestProducer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setPort(5672);
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        for (int i = 0; i < 10; i++) {
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().priority(i).build();
            channel.basicPublish("test-direct-exchange", "test-routing-key", basicProperties, ("hello " + i).getBytes());
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
public class TestConsumer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setPort(5672);
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("test-direct-exchange", BuiltinExchangeType.DIRECT, true);
        Map<String, Object> properties = new HashMap<>();
        properties.put("x-max-priority", 10);
        channel.queueDeclare("test-queue", true, false, false, properties);
        channel.queueBind("test-queue", "test-direct-exchange", "test-routing-key");
        channel.basicQos(1);
        channel.basicConsume("test-queue", false, "test-consumer-tag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收消息 >>> " + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), true);
            }
        });
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

运行代码然后看下“Queues”的管理界面,可以看出优先级队列-“test-queue” 在“Features”一列中加了“Pri”的标识

在这里插入图片描述

在队列具体明细的“Features”一列中加了“x-max-priority”为10的标识。

在这里插入图片描述

idea控制台输出如下:可见消息的优先级越高越先被消费者消费。

在这里插入图片描述

如果去掉了 channel.basicQos(1) 方法对于prefetchCount的限制,控制台输出如下:可见此时对消息设置的优先级失去了效果。

prefetchCount:在消息发送给消费者,但是消费者还未确认的情况下,broker可以继续发送消息的数量。

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/763275
推荐阅读
相关标签
  

闽ICP备14008679号