当前位置:   article > 正文

RabbitMQ之幂等性问题处理_rabbitmq解决幂等问题

rabbitmq解决幂等问题

目录

基本介绍

RabbitMQ幂等性问题

如何避免消息的重复消费问题?


基本介绍

消息消费时的幂等性(消息不被重复消费),同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

幂等性:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;

接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的,比如:注册接口;发送短信验证码接口;还有比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的。

幂等性是分布式系统设计中的一个重要概念,是在做系统或者接口设计时要着重考虑的问题,尤其像支付宝、银行、互联网金融等涉及钱的系统,既要高效,数据也要准确,绝对不能出现多扣款,多打款等问题,幂等性的设计就显得更为重要了。

RabbitMQ幂等性问题

在RabbitMQ也回出现这个问题,RabbitMQ把消息发送给消费者消费,消费者消费成功并返回ack消息,但这时候网络中断了,RabbitMQ没有收到ack消息,这就让RabbitMQ误以为消息消费失败了,然后RabbitMQ会重新把该条消息发送给其他的消费者,或者等网络重连后再次发送给该候消费者,这时候就会造成重复消费的问题。

  • 生产者重复向RabbitMQ代理的消息队列发送消息

消息发送到代理并持久化后,由于网络断开或者客户端崩溃,代理未能回复客户端,导致生产者认为代理没有收到消息而重新发送,结果消费者收到两条具有相同内容和消息ID的消息

  • RabbitMQ代理向消费者重复传递消息

消息发送给消费者后,由于网络断开等原因,消费者客户端没有向broker返回ACK响应,代理不知道消息是否被消费,为了确保消息至少被消费一次,代理在网络恢复后再次传递消息,结果消费者就收到了两条具有相同内容和消息ID的消息。

如何避免消息的重复消费问题?

全局唯一ID + Redis

生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃;

  1. @RabbitListener(queues = "durable-queue")
  2. @RabbitHandler
  3. public void processIde(Message message, Channel channel) throws IOException {
  4. if (stringRedisTemplate.opsForValue().setIfAbsent(message.getMessageProperties().getMessageId(),"1")){
  5. // 业务操作...
  6. System.out.println("消费消息:"+ new String(message.getBody(), "UTF-8"));
  7. // 手动确认
  8. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  9. }
  10. }

 Message

在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

  1. MessageProperties // 消息属性

  2. byte[] body // 消息内容

@RabbitListener

使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

  • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

  • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

    • application/octet-stream:二进制字节数组存储,使用 byte[]
    • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
    • text/plain:文本数据类型存储,使用 String
    • application/json:JSON 格式,使用 Object、相应类型

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/618491
推荐阅读
相关标签
  

闽ICP备14008679号