当前位置:   article > 正文

Rabbitmq 优先队列_rabbitmq优先级队列原理

rabbitmq优先级队列原理

        队列的消费顺序一般是先进先出。但是在某些订单中业务中,我们需要给vip用户后下单,先出货的特殊权限,这时候就需要用到优先级队列
原理,在原来先进先出的逻辑上,给队列备注优先级,最后的顺序如下:
优先级高–>优先级低–>没有备注优先级
ps:优先级的范围为0-255


  • rabbitmq工具类
  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class RabbitMQUtils {
  5. // 创建连接mq的链接工厂对象
  6. private static ConnectionFactory connectionFactory;
  7. static {
  8. connectionFactory = new ConnectionFactory();
  9. // 设置rabbitmq服务器所在的ip port
  10. connectionFactory.setHost("192.168.1.100");
  11. connectionFactory.setPort(5672);
  12. // 设置rabbitmq所在的虚拟主机
  13. connectionFactory.setVirtualHost("/");
  14. // 设置rabbitmq所在服务的用户名 密码
  15. connectionFactory.setUsername("guest");
  16. connectionFactory.setPassword("guest");
  17. }
  18. // 定义提供链接对象的方法
  19. public static Connection getConnection(){
  20. try {
  21. return connectionFactory.newConnection();
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. return null;
  26. }
  27. // 关闭通道和链接的方法
  28. public static void closeChannelAndConnection(Channel channel, Connection connection){
  29. try {
  30. if(channel!=null) channel.close();
  31. if(connection!=null) connection.close();
  32. }catch (Exception e){
  33. e.printStackTrace();
  34. }
  35. }
  36. }

  • 生产者
  1. import cn.my.utils.RabbitMQUtils;
  2. import com.rabbitmq.client.AMQP;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import java.io.IOException;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import java.util.concurrent.TimeoutException;
  9. /**
  10. 优先队列 生产者
  11. **/
  12. public class Product {
  13. //队列名称
  14. public static final String ORDER_QUEUE = "ORDER_QUEUE";
  15. //发消息
  16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  17. //获取信道
  18. Connection connection = RabbitMQUtils.getConnection();
  19. Channel channel = connection.createChannel();
  20. //参数
  21. Map<String, Object> argument = new HashMap<>();
  22. argument.put("x-max-priority", 10);//设置优先级范围0-10,官方允许值是0-255。设置过大会浪费内存
  23. //生成队列,
  24. //不创建交换机,走默认的交换机
  25. //1.名称
  26. //2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)
  27. //3.队列是否只供一个消费者消费,默认否
  28. //4.最后一个消费者断开连接后,是否自动删除。
  29. //5.其他参数
  30. channel.queueDeclare(ORDER_QUEUE, true, false, false, argument);
  31. //发消息
  32. String message = "this is QUEUE_P";
  33. //持续发送消息
  34. for (int i = 0; i < 10; i++) {
  35. //1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)
  36. //2.路由key,直接写队列名即可
  37. //3.参数,忽略
  38. //4.消息体
  39. if (i == 5) {
  40. AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).build(); // priority 优先级
  41. channel.basicPublish("", ORDER_QUEUE, properties, (message + i).getBytes());
  42. } else {
  43. channel.basicPublish("", ORDER_QUEUE, null, (message + i).getBytes());
  44. }
  45. }
  46. System.out.println("消息发送成功");
  47. }
  48. }

  • 生产者
  1. /**
  2. 优先队列 消费者
  3. **/
  4. public class Consume {
  5. //队列名称
  6. public static final String ORDER_QUEUE = "ORDER_QUEUE";
  7. //发消息
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //获取信道
  10. Connection connection = RabbitMQUtils.getConnection();
  11. Channel channel = connection.createChannel();
  12. //消费者未成功消费时候的回调方法
  13. DeliverCallback deliverCallback = (consumerTag, message) -> {
  14. System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));
  15. //手动应答
  16. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
  17. };
  18. //消费者取消消费的回调方法
  19. CancelCallback cancelCallback = consumerTag -> {
  20. System.out.println("消费者取消消费的回调方法");
  21. };
  22. //消费消息
  23. //1.队列名
  24. //2.消费成功后是否自动应答
  25. //3.消费者成功消费时候的回调
  26. //4.消费者取消消费的回调方法
  27. Boolean actoAck = false;
  28. channel.basicConsume(ORDER_QUEUE, actoAck, deliverCallback, cancelCallback);
  29. }
  30. }

  • 测试

先启动生产者,再启动消费者

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/763242
推荐阅读
相关标签
  

闽ICP备14008679号