赞
踩
总有一些消息需要提前消费处理,下面代码实现一下优先队列
1.生产者
/package com.youxianji; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.util.Util; import java.util.HashMap; import java.util.Map; /** * 优先级队列生产者 */ public class Producer { //队列 public static final String NORMAL_QUEUE = "normal_queue"; public static void main(String[] args) throws Exception{ Channel channel = Util.getChannel(); Map<String, Object> arguments = new HashMap<>(); //官方允许是0-255之间 此处设置1所以允许优先级最大范围为0-10 不要设置过大 浪费CUP和内存 arguments.put("x-max-priority",10); channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments); for (int i = 1; i <11 ; i++) { String message = "info"+i; //第五条优先消费 if(i == 5){ AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); channel.basicPublish("",NORMAL_QUEUE,properties,message.getBytes()); }else { channel.basicPublish("",NORMAL_QUEUE,null,message.getBytes()); } System.out.println("发送消息"+message); } } }
2.消费者
package com.youxianji; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.util.Util; /** * 声明主题交换机 */ public class Receive { public static final String NORMAL_QUEUE = "normal_queue"; public static void main(String[] args) throws Exception { Channel channel = Util.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) ->{ System.out.println("队列消费信息:"+ new String(message.getBody(),"UTF-8")); }; //取消消息时的回调 CancelCallback cancelCallback = consumerTag ->{ System.out.println("消息消费被中断"); }; //接收消息 channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback); } }
运行生产者和消费者,结果如下
RabbitMq界面显示队列设置的最大优先级
生产者正常发送10条消息
消费者优先级高的消息优先消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。