当前位置:   article > 正文

「RabbitMQ」实现消息的优先级_java设置rabbitmq消息优先级

java设置rabbitmq消息优先级

目录

1、在生产者方设置优先级

2、创建消息监听器

3、配置Bean


在 Spring Boot 中,要使用 RabbitMQ 的消息优先级功能,可以通过以下步骤进行配置和实现

1、在生产者方设置优先级

  1. @Autowired
  2. private RabbitTemplate rabbitTemplate;
  3. public void sendMessageWithPriority(String message, int priority) {
  4. Message amqpMessage = MessageBuilder
  5. .withBody(message.getBytes())
  6. .setHeader("Priority", priority)
  7. .build();
  8. rabbitTemplate.convertAndSend("myexchange", "myroutingkey", amqpMessage);
  9. }


上述代码中,在 RabbitMQ 消息头中添加了一个名为 Priority 的属性,用于表示消息优先级。

2、创建消息监听器

  1. @Component
  2. public class MessageHandler {
  3. @RabbitListener(
  4. queues = "myqueue",
  5. containerFactory = "priorityListenerContainerFactory",
  6. concurrency = "5"
  7. )
  8. public void handleMessage(Message message) {
  9. System.out.println("Received message with priority: " + message.getMessageProperties().getPriority());
  10. }
  11. }

上述代码中在 @RabbitListener 注解中设置了三个参数:

  • queues指定需要监听的队列;
  • containerFactory指定容器工厂,用于创建消费者容器并设置优先级队列相关的属性。需要事先在配置文件或 Java 配置类中定义支持优先级队列的 RabbitListenerContainerFactory
  • concurrency指定并发消费者的数量。

 3、配置Bean

在配置文件或 Java 配置类中定义 RabbitListenerContainerFactory,用于支持优先级队列相关的属性和值:

  1. @Configuration
  2. public class RabbitMQConfig {
  3. @Autowired
  4. private ConnectionFactory connectionFactory;
  5. @Bean
  6. public RabbitListenerContainerFactory<?> priorityListenerContainerFactory() {
  7. SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  8. factory.setConnectionFactory(connectionFactory);
  9. factory.setConcurrentConsumers(5);
  10. factory.setMaxConcurrentConsumers(10);
  11. factory.setAfterReceivePostProcessors(
  12. message -> {
  13. MessageProperties properties = message.getMessageProperties();
  14. int priority = (int) properties.getHeaders().get("Priority");
  15. properties.setPriority(priority);
  16. return message;
  17. }
  18. );
  19. factory.setPriorityComparator(new PriorityMessageConverter());
  20. return factory;
  21. }
  22. // 自定义的消息转换器
  23. static class PriorityMessageConverter implements Comparator<Message> {
  24. @Override
  25. public int compare(Message o1, Message o2) {
  26. Integer p1 = o1.getMessageProperties().getPriority();
  27. Integer p2 = o2.getMessageProperties().getPriority();
  28. return p2.compareTo(p1);
  29. }
  30. }
  31. }


上述代码中,我们定义了一个支持优先级队列的 SimpleRabbitListenerContainerFactory,并在其中设置了以下属性:

    • connectionFactory:连接工厂;
    • concurrentConsumersmaxConcurrentConsumers:最小和最大的并发消费者数量;
    • afterReceivePostProcessors:一个消息后处理器,用于在接收消息后将 Header 中的优先级设置为 MessageProperties 中的优先级;
    • priorityComparator:设置消息比较器,使具有高优先级的消息先被消费。

另外,需要注意,因为 Spring AMQP 默认使用的是 SimpleMessageConverter,该消息转换器不支持优先级队列相关的属性,因此需要自定义一个消息转换器来实现消息的优先级相关的转换。最后,启动应用程序,生产者发送带有优先级的消息,消费者将按照优先级从高到低消费消息。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/763231
推荐阅读
相关标签
  

闽ICP备14008679号