赞
踩
消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。
如何避免重复消费的问题?
消费者端实现幂等性,意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息。通常有两种方式来避免消费重复消费:
消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)
利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可。
本文将通过一个示例展示第一种方式避免消息重复消费。
示例使用springboot + mysql, 首先得在mysql中创建一张表,用于记录消息是否被消费记录。
-- ----------------------------
-- Table structure for message_idempotent
-- ----------------------------
DROP TABLE IF EXISTS `message_idempotent`;
CREATE TABLE `message_idempotent` (
`message_id` varchar(50) NOT NULL COMMENT '消息ID',
`message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',
PRIMARY KEY (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
pom.xml依赖:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.bruceliu.springboot.message</groupId> <artifactId>springboot_rabbitmq_message_idempotent_consumer</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.yml配置文件:
server: port: 9098 spring: application: name: rabbitmq_message_idempotent_producer rabbitmq: host: localhost virtual-host: / username: guest password: guest publisher-confirms: true port: 5672 datasource: username: root password: 123456 driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/rabbitmq_message_idempotent?characterEncoding=utf8 jpa: database: MySQL show-sql: true hibernate: naming_strategy: org.hibernate.cfg.ImprovedNamingStrategy enable_lazy_load_no_trans: true
rabbitmq配置类:
package com.bruceliu.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author bruceliu * @create 2019-10-29 16:36 * @description */ @Configuration public class RabbitMQConfig { private static final String EXCHANGE_NAME = "message_idempotent_exchange"; private static final String QUEUE_NAME = "message_idempotent_queue"; private static final String ROUTE_KEY = "message.#"; /** * 创建通配符交换机实例 * * @return 通配符交换机实例 */ @Bean public TopicExchange topicExchange() { return new TopicExchange(EXCHANGE_NAME); } /** * 创建队列实例,并持久化 * * @return 队列实例 */ @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } /** * 绑定队列到交换机 * * @return 绑定对象 */ @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTE_KEY); } }
实体类:
/** * @author bruceliu * @create 2019-10-29 16:37 * @description */ @Entity @Table(name = "message_idempotent") public class MessageIdempotent implements Serializable { @Id @Column(name = "message_id") private String messageId; @Column(name = "message_content") private String messageContent; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getMessageContent() { return messageContent; } public void setMessageContent(String messageContent) { this.messageContent = messageContent; } }
repository JPA接口:
/**
* @author bruceliu
* @create 2019-10-29 16:38
* @description
*/
@Repository
public interface MessageIdempotentRepository extends JpaRepository<MessageIdempotent, String> {
}
消息发送者:
/** * @author bruceliu * @create 2019-10-29 16:39 * @description */ @Component public class MessageIdempotentProducer { private static final String EXCHANGE_NAME = "message_idempotent_exchange"; private static final String ROUTE_KEY = "message.insert"; @Autowired private RabbitTemplate rabbitTemplate; /** * 发送消息 */ public void sendMessage() { //创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可) MessageProperties messageProperties = new MessageProperties(); messageProperties.setMessageId(UUID.randomUUID().toString()); messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding("utf-8"); Message message = new Message("hello,message idempotent!".getBytes(), messageProperties); System.out.println("生产消息:" + message.toString()); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, message); } }
注意:这里通过设置UUID为消息全局ID,当然也可以使用时间戳或者业务标识+UUID都可以,只要保证消息ID唯一即可。
消息消费者:
package com.bruceliu.consumer; import com.bruceliu.bean.MessageIdempotent; import com.bruceliu.mapper.MessageIdempotentRepository; import com.rabbitmq.client.Channel; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Optional; /** * @author bruceliu * @create 2019-10-29 16:39 * @description */ @Component public class MessageIdempotentConsumer { private static final Logger logger = LoggerFactory.getLogger(MessageIdempotentConsumer.class); @Autowired private MessageIdempotentRepository messageIdempotentRepository; @RabbitHandler //org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。 //注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器 @RabbitListener(queues = "message_idempotent_queue") @Transactional public void handler(Message message, Channel channel) throws IOException { /** * 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。 */ // 获取消息Id String messageId = message.getMessageProperties().getMessageId(); if (StringUtils.isBlank(messageId)) { logger.info("获取消费ID为空!"); return; } MessageIdempotent messageIdempotent=null; Optional<MessageIdempotent> list = messageIdempotentRepository.findById(messageId); if(list.isPresent()){ messageIdempotent=list.get(); } //System.out.println("messageIdempotent="+messageIdempotent+"--->messageId:"+messageId); //如果找不到,则进行消费此消息 if (null == messageIdempotent) { //获取消费内容 String msg = new String(message.getBody(), StandardCharsets.UTF_8); logger.info("-----获取生产者消息-------------------->" + "messageId:" + messageId + ",消息内容:" + msg); //手动ACK channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //存入到表中,标识该消息已消费 MessageIdempotent idempotent = new MessageIdempotent(); idempotent.setMessageId(messageId); idempotent.setMessageContent(msg); messageIdempotentRepository.save(idempotent); } else { //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费; logger.error("该消息已消费,无须重复消费!"); } } }
需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话,说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息,并且将消息记录进行入库。
测试用例:
package com.bruceliu.test; import com.bruceliu.producer.MessageIdempotentProducer; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @author bruceliu * @create 2019-10-29 16:44 * @description */ @RunWith(SpringRunner.class) @SpringBootTest public class SpringbootRabbitmqMessageIdempotentApplicationTests { @Autowired private MessageIdempotentProducer producer; @Test public void contextLoads() { producer.sendMessage(); } }
运行结果:
Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
2019-10-29 17:12:11.175 INFO 6156 --- [tContainer#0-10] c.b.consumer.MessageIdempotentConsumer : -----获取生产者消息-------------------->messageId:25d4f91c-e408-4129-b5e4-f9204f5bd298,消息内容:hello,message idempotent!
Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
Hibernate: insert into message_idempotent (message_content, message_id) values (?, ?)
可以看到,消息成功被消费者进行 消费,并且将消费记录存到数据表中,用于后面消费的时候进行判断,这样就可以有效避免消息被重复消费的问题。
思路总结:就是首先我们需要根据消息生成一个全局唯一ID,目的就是为了保障操作是绝对唯一的。将消息全局ID作为数据库表主键,因为主键不可能重复。即在消费消息前,先去数据库查询这条消息是否存在消费记录,没有就执行insert操作,如果有就代表已经被消费了,则不进行处理。
以上就是使用全局消息ID避免消息重复消费的问题,这种方式实现起来相对简单,但是缺点也很明显,就是在高并发下,需要频繁读写数据库,无形中增加了数据库的压力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。