当前位置:   article > 正文

RabbitMQ消息幂等性之全局唯一ID_rabbitmq messageid

rabbitmq messageid

一、简介

消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,通常会发送一个ack应答确认信息给生产者,但是这中间有可能因为网络中断等原因,导致生产者未能收到确认消息,由此这条消息将会被 重复发送给其他消费者进行消费,实际上这条消息已经被消费过了,这就是重复消费的问题。

如何避免重复消费的问题?

消费者端实现幂等性,意味着我们的消息永远不会消费多次,即使我们收到了多条一样的消息。通常有两种方式来避免消费重复消费:

  • 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)
  • 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可。

本文将通过一个示例展示第一种方式避免消息重复消费。

二、消息全局ID

示例使用springboot + mysql, 首先得在mysql中创建一张表,用于记录消息是否被消费记录。

【a】数据库创建表语句:

  1. /*
  2. SQLyog Ultimate v11.24 (32 bit)
  3. MySQL - 5.5.44 : Database - rabbitmq_message_idempotent
  4. *********************************************************************
  5. */
  6. /*!40101 SET NAMES utf8 */;
  7. /*!40101 SET SQL_MODE=''*/;
  8. /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
  9. /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
  10. /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
  11. /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  12. CREATE DATABASE /*!32312 IF NOT EXISTS*/`rabbitmq_message_idempotent` /*!40100 DEFAULT CHARACTER SET utf8 */;
  13. USE `rabbitmq_message_idempotent`;
  14. /*Table structure for table `message_idempotent` */
  15. DROP TABLE IF EXISTS `message_idempotent`;
  16. CREATE TABLE `message_idempotent` (
  17. `message_id` varchar(50) NOT NULL COMMENT '消息ID',
  18. `message_content` varchar(2000) DEFAULT NULL COMMENT '消息内容',
  19. PRIMARY KEY (`message_id`)
  20. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  21. /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
  22. /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
  23. /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
  24. /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

【b】pom.xml依赖:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>1.5.21.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.wsh.springboot</groupId>
  12. <artifactId>springboot_rabbitmq_message_idempotent</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>springboot_rabbitmq_message_idempotent</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-amqp</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-data-jpa</artifactId>
  27. </dependency>
  28. <dependency>
  29. <groupId>org.springframework.boot</groupId>
  30. <artifactId>spring-boot-starter-web</artifactId>
  31. </dependency>
  32. <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
  33. <dependency>
  34. <groupId>org.apache.commons</groupId>
  35. <artifactId>commons-lang3</artifactId>
  36. <version>3.4</version>
  37. </dependency>
  38. <dependency>
  39. <groupId>mysql</groupId>
  40. <artifactId>mysql-connector-java</artifactId>
  41. <scope>runtime</scope>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.springframework.boot</groupId>
  45. <artifactId>spring-boot-starter-test</artifactId>
  46. <scope>test</scope>
  47. </dependency>
  48. </dependencies>
  49. <build>
  50. <plugins>
  51. <plugin>
  52. <groupId>org.springframework.boot</groupId>
  53. <artifactId>spring-boot-maven-plugin</artifactId>
  54. </plugin>
  55. </plugins>
  56. </build>
  57. </project>

【c】application.yml配置文件:

  1. server:
  2. port: 9098
  3. spring:
  4. application:
  5. name: rabbitmq_message_idempotent
  6. rabbitmq:
  7. host: localhost
  8. virtual-host: /vhost
  9. username: wsh
  10. password: wsh
  11. publisher-confirms: true
  12. port: 5672
  13. datasource:
  14. username: root
  15. password: wsh0905
  16. driver-class-name: com.mysql.jdbc.Driver
  17. url: jdbc:mysql://127.0.0.1:3306/rabbitmq_message_idempotent?characterEncoding=utf8
  18. jpa:
  19. database: MySQL
  20. show-sql: true
  21. hibernate:
  22. naming_strategy: org.hibernate.cfg.ImprovedNamingStrategy

【d】rabbitmq配置类:

  1. package com.wsh.springboot.springboot_rabbitmq_message_idempotent.config;
  2. import org.springframework.amqp.core.Binding;
  3. import org.springframework.amqp.core.BindingBuilder;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.amqp.core.TopicExchange;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. /**
  9. * @Description: RabbitMQ配置类
  10. * @Author: weixiaohuai
  11. * @Date: 2019/7/21
  12. * @Time: 14:59
  13. * <p>
  14. */
  15. @Configuration
  16. public class RabbitMQConfig {
  17. private static final String EXCHANGE_NAME = "message_idempotent_exchange";
  18. private static final String QUEUE_NAME = "message_idempotent_queue";
  19. private static final String ROUTE_KEY = "message.#";
  20. /**
  21. * 创建通配符交换机实例
  22. *
  23. * @return 通配符交换机实例
  24. */
  25. @Bean
  26. public TopicExchange topicExchange() {
  27. return new TopicExchange(EXCHANGE_NAME);
  28. }
  29. /**
  30. * 创建队列实例,并持久化
  31. *
  32. * @return 队列实例
  33. */
  34. @Bean
  35. public Queue queue() {
  36. return new Queue(QUEUE_NAME, true);
  37. }
  38. /**
  39. * 绑定队列到交换机
  40. *
  41. * @return 绑定对象
  42. */
  43. @Bean
  44. public Binding binding() {
  45. return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTE_KEY);
  46. }
  47. }

【e】实体类:

  1. import javax.persistence.*;
  2. import java.io.Serializable;
  3. @Entity
  4. @Table(name = "message_idempotent")
  5. public class MessageIdempotent implements Serializable {
  6. @Id
  7. @Column(name = "message_id")
  8. private String messageId;
  9. @Column(name = "message_content")
  10. private String messageContent;
  11. public String getMessageId() {
  12. return messageId;
  13. }
  14. public void setMessageId(String messageId) {
  15. this.messageId = messageId;
  16. }
  17. public String getMessageContent() {
  18. return messageContent;
  19. }
  20. public void setMessageContent(String messageContent) {
  21. this.messageContent = messageContent;
  22. }
  23. }

【f】repository JPA接口:

  1. import com.wsh.springboot.springboot_rabbitmq_message_idempotent.entity.MessageIdempotent;
  2. import org.springframework.data.jpa.repository.JpaRepository;
  3. import org.springframework.stereotype.Repository;
  4. @Repository
  5. public interface MessageIdempotentRepository extends JpaRepository<MessageIdempotent, String> {
  6. }

【g】消息发送者:

  1. import org.springframework.amqp.core.Message;
  2. import org.springframework.amqp.core.MessageProperties;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import java.util.UUID;
  7. @Component
  8. public class MessageIdempotentProducer {
  9. private static final String EXCHANGE_NAME = "message_idempotent_exchange";
  10. private static final String ROUTE_KEY = "message.insert";
  11. @Autowired
  12. private RabbitTemplate rabbitTemplate;
  13. /**
  14. * 发送消息
  15. */
  16. public void sendMessage() {
  17. //创建消费对象,并指定全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
  18. MessageProperties messageProperties = new MessageProperties();
  19. messageProperties.setMessageId(UUID.randomUUID().toString());
  20. messageProperties.setContentType("text/plain");
  21. messageProperties.setContentEncoding("utf-8");
  22. Message message = new Message("hello,message idempotent!".getBytes(), messageProperties);
  23. rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, message);
  24. }
  25. }

注意:这里通过设置UUID为消息全局ID,当然也可以使用时间戳或者业务标识+UUID都可以,只要保证消息ID唯一即可。

【h】消息消费者:

  1. import com.rabbitmq.client.Channel;
  2. import com.wsh.springboot.springboot_rabbitmq_message_idempotent.entity.MessageIdempotent;
  3. import com.wsh.springboot.springboot_rabbitmq_message_idempotent.repository.MessageIdempotentRepository;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.amqp.core.Message;
  8. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  9. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.stereotype.Component;
  12. import javax.transaction.Transactional;
  13. import java.io.IOException;
  14. import java.nio.charset.StandardCharsets;
  15. /**
  16. * @Description: RabbitMQ消费者
  17. * @Author: weixiaohuai
  18. * @Date: 2019/7/21
  19. * @Time: 14:59
  20. * <p>
  21. */
  22. @Component
  23. public class MessageIdempotentConsumer {
  24. private static final Logger logger = LoggerFactory.getLogger(MessageIdempotentConsumer.class);
  25. @Autowired
  26. private MessageIdempotentRepository messageIdempotentRepository;
  27. @RabbitHandler
  28. //org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。
  29. //注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器
  30. @RabbitListener(queues = "message_idempotent_queue")
  31. @Transactional
  32. public void handler(Message message, Channel channel) throws IOException {
  33. /**
  34. * 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。
  35. */
  36. // 获取消息Id
  37. String messageId = message.getMessageProperties().getMessageId();
  38. if (StringUtils.isBlank(messageId)) {
  39. logger.info("获取消费ID为空!");
  40. return;
  41. }
  42. MessageIdempotent messageIdempotent = messageIdempotentRepository.findOne(messageId);
  43. //如果找不到,则进行消费此消息
  44. if (null == messageIdempotent) {
  45. //获取消费内容
  46. String msg = new String(message.getBody(), StandardCharsets.UTF_8);
  47. logger.info("-----获取生产者消息-----------------" + "messageId:" + messageId + ",消息内容:" + msg);
  48. //手动ACK
  49. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  50. //存入到表中,标识该消息已消费
  51. MessageIdempotent idempotent = new MessageIdempotent();
  52. idempotent.setMessageId(messageId);
  53. idempotent.setMessageContent(msg);
  54. messageIdempotentRepository.save(idempotent);
  55. } else {
  56. //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
  57. logger.error("该消息已消费,无须重复消费!");
  58. }
  59. }
  60. }

需要注意的是:在消费消息之前,先获取消息ID,然后根据ID去数据库中查询是否存在主键为消息ID的记录,如果存在的话,说明这条消息之前应该是已经被消费过了,那么就不处理这条消息;如果不存在消费记录的话,则消费者进行消费,消费完成发送确认消息,并且将消息记录进行入库。

【i】测试用例:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class SpringbootRabbitmqMessageIdempotentApplicationTests {
  4. @Autowired
  5. private MessageIdempotentProducer producer;
  6. @Test
  7. public void contextLoads() {
  8. producer.sendMessage();
  9. }
  10. }

【j】运行结果:

  1. Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
  2. 2019-07-21 20:26:09.524 INFO 9220 --- [ntContainer#0-1] c.w.s.s.c.MessageIdempotentConsumer : -----获取生产者消息-----------------messageId:aadda3d9-f8ea-4489-bb5b-162d865104b0,消息内容:hello,message idempotent!
  3. Hibernate: select messageide0_.message_id as message_1_0_0_, messageide0_.message_content as message_2_0_0_ from message_idempotent messageide0_ where messageide0_.message_id=?
  4. Hibernate: insert into message_idempotent (message_content, message_id) values (?, ?)

可以看到,消息成功被消费者进行 消费,并且将消费记录存到数据表中,用于后面消费的时候进行判断,这样就可以有效避免消息被重复消费的问题。

  • 思路总结:就是首先我们需要根据消息生成一个全局唯一ID,目的就是为了保障操作是绝对唯一的。将消息全局ID作为数据库表主键,因为主键不可能重复。即在消费消息前,先去数据库查询这条消息是否存在消费记录,没有就执行insert操作,如果有就代表已经被消费了,则不进行处理。

三、总结

以上就是使用全局消息ID避免消息重复消费的问题,这种方式实现起来相对简单,但是缺点也很明显,就是在高并发下,需要频繁读写数据库,无形中增加了数据库的压力。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/763214
推荐阅读
相关标签
  

闽ICP备14008679号