当前位置:   article > 正文

RabbitMQ优先级消息_rabbitmq消息积压优先消费

rabbitmq消息积压优先消费

使用场景

 

对于Mq中可能产生堆积消息的情景来说,如果消息都是不需要实时处理的,那可以等待消费者慢慢的去处理就行了,然后实际使用中我们会遇到这样一种情况,就是mq队列堆积消息很多,但是每条消息处理的有的很慢有的很快,所以此时这种场景就比较适合使用mq的优先级来保证消息的提前消费,让处理慢的消息最后消费

 

 

下面使用java链接mq演示一下消息的处理

初始化优先级队列

 

# 交换机        public static final String EXCHANGE_TEST = "test";# 路由key    public static final String PRIORITY_ROUTE_KEY_TEST = "priority.test";# 队列    public static final String QUEUE_PRIORITY = "queue.priority.1";    @Bean    public TopicExchange testExchange() {        return new TopicExchange(EXCHANGE_TEST);    }    # 声名优先级队列,制定最大优先级10       @Bean    public Queue priority_Queue() {        Map<String, Object> map = new HashMap<String, Object>(1);        map.put("x-max-priority", 10);        return new Queue(QUEUE_PRIORITY, true, false, false, map);    }    # 队列与交换机的绑定   @Bean    public Binding myBindingPriority() {        return new Binding(QUEUE_PRIORITY, Binding.DestinationType.QUEUE, EXCHANGE_TEST, PRIORITY_ROUTE_KEY, null);    }

 

 

消息生产者代码

 

 private RabbitTemplate rabbitTemplate;     public void sendPriorityMsg(String exchange, String routeKey, Object msg, Integer priority) {        MessagePostProcessor messagePostProcessor = message -> {            MessageProperties messageProperties = message.getMessageProperties();            //设置编码            messageProperties.setContentEncoding("utf-8");            //设置过期时间10*1000毫秒            messageProperties.setPriority(priority);            return message;        };        rabbitTemplate.convertAndSend(exchange, routeKey, msg, messagePostProcessor);    }

 

 

发送消息

 

public String sendMsg() {            // 发送了两个优先级的任务        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1", 1);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-2", 1);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-3", 1);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-4", 1);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务1-5", 1);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-5", 5);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-4", 5);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-3", 5);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-2", 5);        msgProducer.sendPriorityMsg(RabbitConfig.EXCHANGE_TEST, RabbitConfig.PRIORITY_ROUTE_KEY, "测试优先级任务2-1", 5);        return "success";    }

 

 

启动程序发送消息

 

此时rabbitmq的管理页面可以看到队列已经创建成功

消费者也是有一个,然后与test交换机进行了绑定

下面开始发送消息,因为我们目前没有订阅队列,也就是和图中类似没有消费者产生,所以现在队列中堆积了10条消息

 

添加消费者

 

    @RabbitListener(queues = RabbitConfig.QUEUE_PRIORITY)    @RabbitHandler    public void priority(String content) {        log.info("交换机 test 的消息:" + content);    }

 

此时重新启动程序,消费者消息会自动接受处理了,打印结果如下

此时我们可以明显的看到优先级5的消息优先被处理了,所以到此为止,一个简单的mq优先级消息队列就结束了

 

 

 

 

步骤

1:创建交换机,初始化指定最大优先级的队列

2: 创建生产者发送消息的优先级指定信息

3: 创建消费者监听

相关源码已经提交github或者点击原文

https://github.com/TianPuJun/springboot-demo/tree/master/springboot-rabbitmq

微信号:c1041067258

JavaLog

跟着无痕轻松Java

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/763249
推荐阅读
相关标签
  

闽ICP备14008679号