当前位置:   article > 正文

RabbitMQ幂等性、限流、消息确认代码实现_mq限流

mq限流

一、如何保证消息不重复消费

1.简介

消息幂等性,其实就是保证一个消息不管因为什么原因都只会被消费一次,正常情况下,消费者消费完消息之后,会发送一个ack给生产者,但是中途有可能因为网络等原因导致生产者未收到确认信息,导致一条消息被重复消费。

如何避免重复消费的问题

1.使用一个唯一的id。消费之前根据id去判断消息是否被消费过,未被消费则正常使用消息。

2.使用redis只能有一个key的特性。

我们这里只介绍第一种方法,SpringAMQP中给出了这种问题的解决方案。

关于配置问题和入门使用SpringAMQP,大家可以看我之前写的文章。

SpringAMQP快速入门-CSDN博客

2.代码介绍

在这里我们直接展示生产者和消费者代码,看一下具体如何通过代码实现。

生产者代码:

  1. @SpringBootTest
  2. class DemoMqSendApplicationTests {
  3.    @Autowired
  4.    private RabbitTemplate rabbitTemplate;
  5.    @Test
  6.    void contextLoads() {
  7.        //声明交换机
  8.        String exchange="simple.fanout";
  9.        //amqp自带的配置,可以设置唯一id
  10.        MessageProperties properties = new MessageProperties();
  11.        //在这里我们使用uuid作为唯一id来进行幂等性判断
  12.        properties.setMessageId(UUID.randomUUID().toString());
  13.        properties.setContentType("text/plain");
  14.        properties.setContentEncoding("utf-8");
  15.        //将配置和要发送的消息保存一下
  16.        Message message = new Message("hello".getBytes(), properties);
  17.        //进行消息的发送
  18.        rabbitTemplate.convertAndSend(exchange,"", message);
  19.   }
  20. }

监听者代码:

  1. @Component
  2. @Slf4j
  3. public class Listener {
  4.    @RabbitListener(queues = "simple.queue")
  5.    public void listener(Message message, Channel channel) {
  6.        //获取到发送的信息
  7.        byte[] body = message.getBody();
  8.        String msg = new String(body);
  9.        //获取到对应的配置
  10.        MessageProperties properties = message.getMessageProperties();
  11.        //获取到唯一的业务id
  12.        String messageId = properties.getMessageId();
  13.        /**
  14.         * 在这里进行业务判断,如果数据库中有这个id则不进行操作,否则进行操作,保证幂等性
  15.         */
  16.        log.info("listener: {}", msg);
  17.        log.info("listener: {}", message.getMessageProperties().getMessageId());
  18.        
  19.   }
  20. }
listener: hello listenerID: 4b40e96e-2c39-4880-bea9-25d673a57a9a

根据两条日志看到唯一的id就是4b40e96e-2c39-4880-bea9-25d673a57a9a,发送的消息就是hello

拿到id之后,在消费消息前,先去数据库查询这条消息是否存在消费记录,没有就执行insert操作,如果有就代表已经被消费了,则不进行处理。

二、如何保证消息不丢失
1.介绍

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;

  2. RabbitMQ发送消息到消费端;

  3. 消费端消费这条消息;

在这里不对12进行详细的讲解,发生最多的也就是消费端接收消息丢失。如果想知道12具体操作,以后可以具体介绍。

2.实现
  1. @RabbitListener(queues = "simple.queue2")
  2. public void listener2(Message message, Channel channel) {
  3.    try {
  4.        byte[] body = message.getBody();
  5.        //拿到接收到的消息
  6.        String msg = new String(body, StandardCharsets.UTF_8);
  7.        //获取配置文件
  8.        MessageProperties properties = message.getMessageProperties();
  9.        //获取到唯一标识符
  10.        long tag = properties.getDeliveryTag();
  11.        //消息确认机制
  12.        channel.basicAck(tag, false);
  13.   } catch (IOException e) {
  14.        throw new RuntimeException(e);
  15.   }
  16. }

在这里的手动确认机制我们使用了channel.basicAck来实现,里面有两个参数,分别是一个long类型的唯一标识符和布尔类型的参数。第一个参数对应的就是配置里面的DeliveryTag,每条消息的标识符都是唯一的。布尔类型参数代表的是当为true时就是确认多条消息,false的话就是只确认这一条消息,一般我们都是false。

还需要再配置文件中加上simple:acknowledge-mode: manual就是开启手动确认。

  1. listener:
  2.  simple:
  3.    acknowledge-mode: manual
三、如何进行限流
1.介绍

最后一个问题就是如何进行限流,为什么要限流?防止数据量突然特别大导致服务崩溃。

2.实现

使用channel.basicQos(int prefetchSize, int prefetchCount, boolean global)来实现

参数介绍:

prefetchSize:表示消息的大小,0的话不限制大小

prefetchCount:表示消息的数量

global:true表示该通道下的所有消费者都适用这个策略,而false表示只有当前这一个消费者适用这个策略。

代码实现:

  1. @RabbitListener(queues = "simple.queue2")
  2. public void listener2(Message message, Channel channel) {
  3.    try {
  4.        //开启限流
  5.        channel.basicQos(0,1,false);
  6.        byte[] body = message.getBody();
  7.        //拿到接收到的消息
  8.        String msg = new String(body, StandardCharsets.UTF_8);
  9.        //获取配置文件
  10.        MessageProperties properties = message.getMessageProperties();
  11.        //获取到唯一标识符
  12.        long tag = properties.getDeliveryTag();
  13.        //消息确认机制
  14.        channel.basicAck(tag, false);
  15.   } catch (IOException e) {
  16.        throw new RuntimeException(e);
  17.   }
  18. }

这里的限流表示不限制消息大小,但是限制消息数量,,一次只能发送一条消息。

为什么要限制流量和大小呢?

限流肯定都可以理解,突然的大流量会导致服务崩溃,如果流量不是很多,但是数据量特别大,比如一个人一天吃一个馒头,今天还是吃一个,但是有10公斤重,也会导致服务崩溃。

注意:限流和手动确认机制最好一起使用。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/996902
推荐阅读
相关标签
  

闽ICP备14008679号