赞
踩
消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。
如何避免重复消费的问题?
消费者端实现幂等性,意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息。通常有两种方式来避免消费重复消费:
本文将通过一个示例展示第一种方式避免消息重复消费。
示例使用springboot + mysql, 首先得在mysql中创建一张表,用于记录消息是否被消费记录。
【a】数据库创建表语句:
- /*
- SQLyog Ultimate v11.24 (32 bit)
- MySQL - 5.5.44 : Database - rabbitmq_message_idempotent
- *********************************************************************
- */
-
-
- /*!40101 SET NAMES utf8 */;
-
- /*!40101 SET SQL_MODE=''*/;
-
- /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
- /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
- /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
- /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
- CREATE DATABASE /*!32312 IF NOT EXISTS*/`rabbitmq_message_idempotent` /*!40100 DEFAULT CHARACTER SET utf8 */;
-
- USE `rabbitmq_message_idempotent`;
-
- /*Table structure for table `message_idempotent` */
-
- DROP TABLE IF EXISTS `message_idempotent`;
-
- CREATE TABLE `message_idempotent` (
- `message_id` varchar(50) NOT NULL COMMENT '消息ID',
- `message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',
- PRIMARY KEY (`message_id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
- /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
- /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
- /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
- /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
【b】pom.xml依赖:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.21.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.wsh.springboot</groupId>
- <artifactId>springboot_rabbitmq_message_idempotent</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>springboot_rabbitmq_message_idempotent</name>
- <description>Demo project for Spring Boot</description>
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-jpa</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>3.4</version>
- </dependency>
-
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>
【c】application.yml配置文件:
- server:
- port: 9098
- spring:
- application:
- name: rabbitmq_message_idempotent
- rabbitmq:
- host: localhost
- virtual-host: /vhost
- username: wsh
- password: wsh
- publisher-confirms: true
- port: 5672
- datasource:
- username: root
- password: wsh0905
- driver-class-name: com.mysql.jdbc.Driver
- url: jdbc:mysql://127.0.0.1:3306/rabbitmq_message_idempotent?characterEncoding=utf8
- jpa:
- database: MySQL
- show-sql: true
- hibernate:
- naming_strategy: org.hibernate.cfg.ImprovedNamingStrategy
【d】rabbitmq配置类:
- package com.wsh.springboot.springboot_rabbitmq_message_idempotent.config;
-
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * @Description: RabbitMQ配置类
- * @Author: weixiaohuai
- * @Date: 2019/7/21
- * @Time: 14:59
- * <p>
- */
- @Configuration
- public class RabbitMQConfig {
- private static final String EXCHANGE_NAME = "message_idempotent_exchange";
- private static final String QUEUE_NAME = "message_idempotent_queue";
- private static final String ROUTE_KEY = "message.#";
-
- /**
- * 创建通配符交换机实例
- *
- * @return 通配符交换机实例
- */
- @Bean
- public TopicExchange topicExchange() {
- return new TopicExchange(EXCHANGE_NAME);
- }
-
- /**
- * 创建队列实例,并持久化
- *
- * @return 队列实例
- */
- @Bean
- public Queue queue() {
- return new Queue(QUEUE_NAME, true);
- }
-
- /**
- * 绑定队列到交换机
- *
- * @return 绑定对象
- */
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTE_KEY);
- }
-
- }
【e】实体类:
- import javax.persistence.*;
- import java.io.Serializable;
-
- @Entity
- @Table(name = "message_idempotent")
- public class MessageIdempotent implements Serializable {
- @Id
- @Column(name = "message_id")
- private String messageId;
- @Column(name = "message_content")
- private String messageContent;
-
- public String getMessageId() {
- return messageId;
- }
-
- public void setMessageId(String messageId) {
- this.messageId = messageId;
- }
-
- public String getMessageContent() {
- return messageContent;
- }
-
- public void setMessageContent(String messageContent) {
- this.messageContent = messageContent;
- }
- }
【f】repository JPA接口:
- import com.wsh.springboot.springboot_rabbitmq_message_idempotent.entity.MessageIdempotent;
- import org.springframework.data.jpa.repository.JpaRepository;
- import org.springframework.stereotype.Repository;
-
- @Repository
- public interface MessageIdempotentRepository extends JpaRepository<MessageIdempotent, String> {
-
-
- }
【g】消息发送者:
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import java.util.UUID;
-
- @Component
- public class MessageIdempotentProducer {
- private static final String EXCHANGE_NAME = "message_idempotent_exchange";
- private static final String ROUTE_KEY = "message.insert";
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- /**
- * 发送消息
- */
- public void sendMessage() {
- //创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setMessageId(UUID.randomUUID().toString());
- messageProperties.setContentType("text/plain");
- messageProperties.setContentEncoding("utf-8");
- Message message = new Message("hello,message idempotent!".getBytes(), messageProperties);
- rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, message);
- }
-
- }
注意:这里通过设置UUID为消息全局ID,当然也可以使用时间戳或者业务标识+UUID都可以,只要保证消息ID唯一即可。
【h】消息消费者:
- import com.rabbitmq.client.Channel;
- import com.wsh.springboot.springboot_rabbitmq_message_idempotent.entity.MessageIdempotent;
- import com.wsh.springboot.springboot_rabbitmq_message_idempotent.repository.MessageIdempotentRepository;
- import org.apache.commons.lang3.StringUtils;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import javax.transaction.Transactional;
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
-
- /**
- * @Description: RabbitMQ消费者
- * @Author: weixiaohuai
- * @Date: 2019/7/21
- * @Time: 14:59
- * <p>
- */
- @Component
- public class MessageIdempotentConsumer {
-
- private static final Logger logger = LoggerFactory.getLogger(MessageIdempotentConsumer.class);
-
- @Autowired
- private MessageIdempotentRepository messageIdempotentRepository;
-
- @RabbitHandler
- //org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。
- //注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器
- @RabbitListener(queues = "message_idempotent_queue")
- @Transactional
- public void handler(Message message, Channel channel) throws IOException {
- /**
- * 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。
- */
-
- // 获取消息Id
- String messageId = message.getMessageProperties().getMessageId();
- if (StringUtils.isBlank(messageId)) {
- logger.info("获取消费ID为空!");
- return;
- }
- MessageIdempotent messageIdempotent = messageIdempotentRepository.findOne(messageId);
- //如果找不到,则进行消费此消息
- if (null == messageIdempotent) {
- //获取消费内容
- String msg = new String(message.getBody(), StandardCharsets.UTF_8);
- logger.info("-----获取生产者消息-----------------" + "messageId:" + messageId + ",消息内容:" + msg);
- //手动ACK
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-
- //存入到表中,标识该消息已消费
- MessageIdempotent idempotent = new MessageIdempotent();
- idempotent.setMessageId(messageId);
- idempotent.setMessageContent(msg);
- messageIdempotentRepository.save(idempotent);
- } else {
- //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
- logger.error("该消息已消费,无须重复消费!");
- }
- }
-
- }
需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话,说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息,并且将消息记录进行入库。
【i】测试用例:
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class SpringbootRabbitmqMessageIdempotentApplicationTests {
-
- @Autowired
- private MessageIdempotentProducer producer;
-
- @Test
- public void contextLoads() {
- producer.sendMessage();
- }
-
- }
【j】运行结果:
- Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
- 2019-07-21 20:26:09.524 INFO 9220 --- [ntContainer#0-1] c.w.s.s.c.MessageIdempotentConsumer : -----获取生产者消息-----------------messageId:aadda3d9-f8ea-4489-bb5b-162d865104b0,消息内容:hello,message idempotent!
- Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
- Hibernate: insert into message_idempotent (message_content, message_id) values (?, ?)
可以看到,消息成功被消费者进行 消费,并且将消费记录存到数据表中,用于后面消费的时候进行判断,这样就可以有效避免消息被重复消费的问题。
以上就是使用全局消息ID避免消息重复消费的问题,这种方式实现起来相对简单,但是缺点也很明显,就是在高并发下,需要频繁读写数据库,无形中增加了数据库的压力。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。