赞
踩
RabbitMQ在3.5.0版本的时候提供了优先级队列的实现。客户端通过配置队列的x-max-priority
参数的方式设置一个队列支持的最大优先级(但是不能使用策略的方式配置)以此来声明一个优先级队列。优先级最大值为255、最小值为0(默认值),推荐1~10。生产者可以通过设置Basic.Properties的priority属性设置消息的优先级(值越大,优先级越高)。优先级越高,越先被消费者消费,但是带来的内存、磁盘、CPU开销越高。
如果消费者的消费速度远低于生产者生产消息的速度、Broker有消息积压的情况下,对消息设置的优先级才有意义。
public class TestProducer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setPort(5672); connectionFactory.setHost("127.0.0.1"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); for (int i = 0; i < 10; i++) { AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().priority(i).build(); channel.basicPublish("test-direct-exchange", "test-routing-key", basicProperties, ("hello " + i).getBytes()); } } }
public class TestConsumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); connectionFactory.setPort(5672); connectionFactory.setHost("127.0.0.1"); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("test-direct-exchange", BuiltinExchangeType.DIRECT, true); Map<String, Object> properties = new HashMap<>(); properties.put("x-max-priority", 10); channel.queueDeclare("test-queue", true, false, false, properties); channel.queueBind("test-queue", "test-direct-exchange", "test-routing-key"); channel.basicQos(1); channel.basicConsume("test-queue", false, "test-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收消息 >>> " + new String(body)); channel.basicAck(envelope.getDeliveryTag(), true); } }); } }
运行代码然后看下“Queues”的管理界面,可以看出优先级队列-“test-queue” 在“Features”一列中加了“Pri”的标识
在队列具体明细的“Features”一列中加了“x-max-priority”为10的标识。
idea控制台输出如下:可见消息的优先级越高越先被消费者消费。
如果去掉了 channel.basicQos(1) 方法对于prefetchCount的限制,控制台输出如下:可见此时对消息设置的优先级失去了效果。
prefetchCount:在消息发送给消费者,但是消费者还未确认的情况下,broker可以继续发送消息的数量。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。