当前位置:   article > 正文

RabbitMQ消息幂等性_rabbitmq消息接收幂等性是什么意思

rabbitmq消息接收幂等性是什么意思

1.简介

消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。

如何避免重复消费的问题?

消费者端实现幂等性,意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息。通常有两种方式来避免消费重复消费:

  • 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)

  • 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可。

本文将通过一个示例展示第一种方式避免消息重复消费。

2.消息全局ID

示例使用springboot + mysql, 首先得在mysql中创建一张表,用于记录消息是否被消费记录。

  • 数据库创建表语句
-- ----------------------------
-- Table structure for message_idempotent
-- ----------------------------
DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (
  `message_id` varchar(50) NOT NULL COMMENT '消息ID',
  `message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',
  PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

pom.xml依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.bruceliu.springboot.message</groupId>
    <artifactId>springboot_rabbitmq_message_idempotent_consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

application.yml配置文件:

server:
  port: 9098
spring:
  application:
    name: rabbitmq_message_idempotent_producer
  rabbitmq:
    host: localhost
    virtual-host: /
    username: guest
    password: guest
    publisher-confirms: true
    port: 5672
  datasource:
    username: root
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/rabbitmq_message_idempotent?characterEncoding=utf8
  jpa:
    database: MySQL
    show-sql: true
    hibernate:
      naming_strategy: org.hibernate.cfg.ImprovedNamingStrategy
      enable_lazy_load_no_trans: true

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

rabbitmq配置类:

package com.bruceliu.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author bruceliu
 * @create 2019-10-29 16:36
 * @description
 */
@Configuration
public class RabbitMQConfig {

    private static final String EXCHANGE_NAME = "message_idempotent_exchange";
    private static final String QUEUE_NAME = "message_idempotent_queue";
    private static final String ROUTE_KEY = "message.#";

    /**
     * 创建通配符交换机实例
     *
     * @return 通配符交换机实例
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    /**
     * 创建队列实例,并持久化
     *
     * @return 队列实例
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true);
    }

    /**
     * 绑定队列到交换机
     *
     * @return 绑定对象
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTE_KEY);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

实体类:

/**
 * @author bruceliu
 * @create 2019-10-29 16:37
 * @description
 */
@Entity
@Table(name = "message_idempotent")
public class MessageIdempotent implements Serializable {

    @Id
    @Column(name = "message_id")
    private String messageId;

    @Column(name = "message_content")
    private String messageContent;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getMessageContent() {
        return messageContent;
    }

    public void setMessageContent(String messageContent) {
        this.messageContent = messageContent;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

repository JPA接口:

/**
 * @author bruceliu
 * @create 2019-10-29 16:38
 * @description
 */
@Repository
public interface MessageIdempotentRepository extends JpaRepository<MessageIdempotent, String> {


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

消息发送者:

/**
 * @author bruceliu
 * @create 2019-10-29 16:39
 * @description
 */
@Component
public class MessageIdempotentProducer {
    private static final String EXCHANGE_NAME = "message_idempotent_exchange";

    private static final String ROUTE_KEY = "message.insert";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     */
    public void sendMessage() {
        //创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(UUID.randomUUID().toString());
        messageProperties.setContentType("text/plain");
        messageProperties.setContentEncoding("utf-8");
        Message message = new Message("hello,message idempotent!".getBytes(), messageProperties);
        System.out.println("生产消息:" + message.toString());
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

注意:这里通过设置UUID为消息全局ID,当然也可以使用时间戳或者业务标识+UUID都可以,只要保证消息ID唯一即可。

消息消费者:

package com.bruceliu.consumer;

import com.bruceliu.bean.MessageIdempotent;
import com.bruceliu.mapper.MessageIdempotentRepository;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

/**
 * @author bruceliu
 * @create 2019-10-29 16:39
 * @description
 */
@Component
public class MessageIdempotentConsumer {

    private static final Logger logger = LoggerFactory.getLogger(MessageIdempotentConsumer.class);

    @Autowired
    private MessageIdempotentRepository messageIdempotentRepository;

    @RabbitHandler
    //org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。
    //注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器
    @RabbitListener(queues = "message_idempotent_queue")
    @Transactional
    public void handler(Message message, Channel channel) throws IOException {
        /**
         * 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。
         */

        // 获取消息Id
        String messageId = message.getMessageProperties().getMessageId();
        if (StringUtils.isBlank(messageId)) {
            logger.info("获取消费ID为空!");
            return;
        }
        MessageIdempotent messageIdempotent=null;
        Optional<MessageIdempotent> list = messageIdempotentRepository.findById(messageId);
        if(list.isPresent()){
            messageIdempotent=list.get();
        }
        //System.out.println("messageIdempotent="+messageIdempotent+"--->messageId:"+messageId);
        //如果找不到,则进行消费此消息
        if (null == messageIdempotent) {
            //获取消费内容
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            logger.info("-----获取生产者消息-------------------->" + "messageId:" + messageId + ",消息内容:" + msg);
            //手动ACK
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

            //存入到表中,标识该消息已消费
            MessageIdempotent idempotent = new MessageIdempotent();
            idempotent.setMessageId(messageId);
            idempotent.setMessageContent(msg);
            messageIdempotentRepository.save(idempotent);
        } else {
            //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
            logger.error("该消息已消费,无须重复消费!");
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话,说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息,并且将消息记录进行入库。

测试用例:

package com.bruceliu.test;

import com.bruceliu.producer.MessageIdempotentProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author bruceliu
 * @create 2019-10-29 16:44
 * @description
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootRabbitmqMessageIdempotentApplicationTests {

    @Autowired
    private MessageIdempotentProducer producer;

    @Test
    public void contextLoads() {
        producer.sendMessage();
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

运行结果:

Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
2019-10-29 17:12:11.175  INFO 6156 --- [tContainer#0-10] c.b.consumer.MessageIdempotentConsumer   : -----获取生产者消息-------------------->messageId:25d4f91c-e408-4129-b5e4-f9204f5bd298,消息内容:hello,message idempotent!
Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
Hibernate: insert into message_idempotent (message_content, message_id) values (?, ?)
  • 1
  • 2
  • 3
  • 4

可以看到,消息成功被消费者进行 消费,并且将消费记录存到数据表中,用于后面消费的时候进行判断,这样就可以有效避免消息被重复消费的问题。

思路总结:就是首先我们需要根据消息生成一个全局唯一ID,目的就是为了保障操作是绝对唯一的。将消息全局ID作为数据库表主键,因为主键不可能重复。即在消费消息前,先去数据库查询这条消息是否存在消费记录,没有就执行insert操作,如果有就代表已经被消费了,则不进行处理。

3.总结

以上就是使用全局消息ID避免消息重复消费的问题,这种方式实现起来相对简单,但是缺点也很明显,就是在高并发下,需要频繁读写数据库,无形中增加了数据库的压力。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/440488
推荐阅读
相关标签
  

闽ICP备14008679号