当前位置:   article > 正文

rabbitmq的幂等优先级队列和惰性队列_rabbitmq 队列优先级

rabbitmq 队列优先级

theme: orange

前言

前面的几个章节我们学习了rabbitmq的消息确认和消息回退机制, 还有备份exchange功能

今天来聊聊幂等, 优先级队列和惰性队列

rabbitmq幂等

什么是幂等? 为什么需要幂等?

类似于mysql的一致性, 支付了一个商品的价格, 如果商品购买失败, 再次购买该商品, 购买成功, 不会出现扣了两次钱买了一件商品的事情

什么原因导致消息重复发送的?

在回答这个问题之前, 我们需要了解, 什么是消息送达?

对于rabbitmq来说消息送达, 是将消息送到消费者queue中

对于程序员来说, 消息送达是消息投递到消费者手中

谁需要注意幂等问题?

消费者和生产者会出现重复消息的问题, 但是问题只需要在消费者这边处理

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息

rabbitmq怎么做到幂等?

消费者做消息幂等消费

很简单, 对每个消息添加唯一标识, 比如uuid

消费者消费消息前, 将其添加到ConcurrentHashMap<String, Integer>()

其中keyuuid, value是数量, 数量一般是 1

在消息进入消费者前, 我们执行ConcurrentHashMap#putIfAbsent代码, 消息put成功一次之后, 我们才去执行, 如果put发现已经有元素, 那么直接丢弃消息, 说明消费者已经消费了我们的消息

问: 进程崩溃, 保存在内存的ConcurrentHashMap也将消失, 到时候照样会出现消息重复消费

答: 我们还可以考虑使用redissetnx命令

使用redis做幂等

跟上面的步骤差不多, 但是使用到redissetnx命令, 保证原子性, 这样即便消费端重复发送消息, 生产者也只会根据messageId消费一次

需要注意锁的范围, 是针对整个分布式系统呢, 还是针对单个消费者个体

使用redis做幂等引入了新的问题, 那就是redis标记了某个消息已处理, 但是消费者准备执行业务时, 挂了, 我们应该怎么办?

使用redis做幂等的问题

redis 说你消息A的相关业务已经处理过了

但实际上消费者只不过是标记了消息A已处理, 跟消息A相关的业务还没执行, 消费者挂了

在消费者重启后, 会自动加载queue中的消息A, 然后被redis拦截并拒绝掉

被拒绝掉的消息最后会走死信队列

咋办? 凉拌, 也许可以选前面的ConcurrentHashMap方案, 干脆直接在内存中保存着吧

问: 使用 ConcurrenthashMap 不也会导致消息被重新消费吗?

答: 确实, 内存中的记录丢失之后, 同样的消息还是会继续投递过来, 即便业务已经执行过了, 但是在ack函数没有被执行前, 都将在queue中保存着消息, 消费者重启后, 消息还是会被继续投递

优先级队列

为什么需要优先级队列?

队列繁忙的时候, 消息堆积, 一些重要的消息需要优先投递, 此时消息优先级

优先级需要注意的地方

  1. 说到消息的优先级, 我们需要知道队列才是对不同优先级的message进行排序的场所. 因为队列可以存储很多message, 也就能够很方便对不同优先级的message进行排序

  2. RabbitMQ 队列默认情况下不支持优先级。创建优先级队列时,开发人员可以选择认为合适的最大优先级。一般优先级最好为 1~10, 使用更多优先级将消耗更多的 CPU 资源

  3. 没有priority属性的消息其优先级被视为 0。优先级高于队列最大值的消息将被视为以最大优先级发布。

  4. 优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能

  5. 非优先级队列发送优先级消息是不会排序的,所以向非优先级队列发送优先级是没有任何作用的

  6. 在普通队列中消息过期就立即被删除, 在优先级队列中, 消息过期时, 如果被fetch, 那么就不会被删除

  7. 如果配置了queue的最大值, 如果queue满了, 不论高优先级的消息还是低优先级的消息, 都会被丢失, 但正常情况应该是低优先级的消息会被丢失, 而不是高优先级

  8. 需要配置Qos为1, 也就是prefetch为1, 如果你不配置, 那么队列中的消息会一下子都给消费者读取出来, 预期值默认好像是150还是250, 也就是说他会一下子读那么多个消息, 而消息队列可能来不及排序

优先级队列怎么使用?

  1. 消费者配置队列最大优先级
  2. 消费者配置预期值
  3. 生产者配置消息优先级
@Bean
public Queue queue() {
   return QueueBuilder.durable(QUEUE)
         .maxPriority(10)
         .build();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

实战

消费端:

public class Consumer {
   public static final String EXCHANGE_NAME = "exchange_name";
   public static final String QUEUE = "queue";
   
   public static void main(String[] args) throws Exception {
      ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      channel.basicQos(1);
      
      channel.confirmSelect();
      channel.addConfirmListener(new ConfirmListener() {
         @Override
         public void handleAck(long deliveryTag, boolean multiple) {
            System.err.println("消息已响应, deliveryTag: " + deliveryTag);
         }
         
         @Override
         public void handleNack(long deliveryTag, boolean multiple) {
            System.err.println("消息未响应, deliveryTag: " + deliveryTag);
         }
      });
      
      channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
//    HashMap<String, Object> arguments = new HashMap<>();
//    arguments.put("x-max-priority", 10);
      Queue queue = QueueBuilder.durable(QUEUE).maxPriority(10).build();
      channel.queueDeclare(queue.getName(), true, false, false, queue.getArguments());
      channel.queueBind(QUEUE, EXCHANGE_NAME, "");
      channel.basicConsume(QUEUE, false, (consumerTag, message) -> {
      try {
         System.err.println("messageContent: "
               + new String(message.getBody(), Charset.defaultCharset())
               + ", priority: " + message.getProperties().getPriority());
      } catch(Exception e) {
          // 省略日志记录: 时间, 消息内容和错误信息等
          channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
      }
      channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
      }, consumerTag -> System.err.println("consumerTag: " + consumerTag));
   }
   
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

核心代码:

channel.basicQos(1);
//    HashMap<String, Object> arguments = new HashMap<>();
//    arguments.put("x-max-priority", 10);
Queue queue = QueueBuilder.durable(QUEUE).maxPriority(10).build();
  • 1
  • 2
  • 3
  • 4

生产端:

public class Producer {
   public static final String EXCHANGE_NAME = "exchange_name";
   
   public static void main(String[] args) throws Exception {
      ConnectionFactory factory = RabbitUtils.INSTANCE.connectionFactory();
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      
      channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
      
      for (int i = 100; i > 0; i--) {
         var priority = i % 11;
         var props = new AMQP.BasicProperties().builder().priority(priority);
         channel.basicPublish(EXCHANGE_NAME, "", props.build(),
               ("message: " + i).getBytes(Charset.defaultCharset()));
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

效果:

// 省略部分打印
messageContent: message: 100, priority: 1
messageContent: message: 98, priority: 10
messageContent: message: 10, priority: 10
messageContent: message: 97, priority: 9
messageContent: message: 86, priority: 9
messageContent: message: 46, priority: 2
messageContent: message: 2, priority: 2
messageContent: message: 89, priority: 1
messageContent: message: 1, priority: 1
messageContent: message: 33, priority: 0
messageContent: message: 22, priority: 0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如果你使用的是spring boot 项目那么qos应该这么配置:

server:
  port: 8081
spring:
  rabbitmq:
    host: 127.0.0.1
    virtual-host: /
    username: zhazha
    password: 123456
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

惰性队列

是什么?

惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中

为什么?

能够支持更长的队列,即支持更多的消息存储。

当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

怎么用?

//     HashMap<String, Object> arguments = new HashMap<>();
//    arguments.put("x-queue-mode", "lazy");
Queue queue = QueueBuilder.durable(QUEUE)
    .lazy().build();
channel.queueDeclare(queue.getName(), true, false, false, queue.getArguments());
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/418842
推荐阅读
相关标签
  

闽ICP备14008679号