当前位置:   article > 正文

手搭手RocketMQ重复消费问题_rocketmq-spring-boot-starter 多服消费者如何保证不被重复消费

rocketmq-spring-boot-starter 多服消费者如何保证不被重复消费

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

dynamic-datasource

3.6.1

mybatis-plus

3.5.3.2

rocketmq

4.9.4

加入依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. <exclusions><!-- 排除logback依赖 -->
  6. <exclusion>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-logging</artifactId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <!--Log4j2场景启动器 -->
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-log4j2</artifactId>
  16. </dependency>
  17. <dependency>
  18. <groupId>com.baomidou</groupId>
  19. <artifactId>mybatis-plus-boot-starter</artifactId>
  20. <version>3.5.3</version>
  21. <exclusions>
  22. <exclusion>
  23. <groupId>com.baomidou</groupId>
  24. <artifactId>mybatis-plus-generator</artifactId>
  25. </exclusion>
  26. </exclusions>
  27. </dependency>
  28. <dependency>
  29. <groupId>com.mysql</groupId>
  30. <artifactId>mysql-connector-j</artifactId>
  31. <scope>runtime</scope>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.projectlombok</groupId>
  35. <artifactId>lombok</artifactId>
  36. <optional>true</optional>
  37. </dependency>
  38. <dependency>
  39. <groupId>org.springframework.boot</groupId>
  40. <artifactId>spring-boot-starter-test</artifactId>
  41. <scope>test</scope>
  42. </dependency>
  43. <dependency>
  44. <groupId>com.alibaba</groupId>
  45. <artifactId>druid-spring-boot-starter</artifactId>
  46. <version>1.1.14</version>
  47. </dependency>
  48. <dependency>
  49. <groupId>com.baomidou</groupId>
  50. <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
  51. <version>3.6.1</version>
  52. </dependency>
  53. <dependency>
  54. <groupId>p6spy</groupId>
  55. <artifactId>p6spy</artifactId>
  56. <version>3.9.1</version>
  57. </dependency>
  58. <dependency>
  59. <groupId>org.apache.rocketmq</groupId>
  60. <artifactId>rocketmq-client</artifactId>
  61. <version>4.9.2</version>
  62. </dependency>
  63. </dependencies>

消息中间件的对比

消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域

RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。

RocketMQ的作用:数据收集、限流削峰、异步解耦

数据收集

分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。

限流削峰

MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。

异步解耦

上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。

rocketmq.apache.org

Broker:经纪人(经理人)

Topic主题:消息区分,分类,虚拟结构

Queue:消息队列

Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

消费重复问题

1、 生产者多次投递

2、负载均衡模式消费者扩容时重试

解决办法

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)

msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等,需要控制消息的幂等性),这种情况就需要使业务字段进行重复消费。

幂等性:多次操作产生的结果和第一次操作产生的结果一致。

存储key可用mysql,oracle,redis等数据库做验证

本次解决方案为:将key插入Mysql数据库,创建唯一索引,插入成功执行业务逻辑,插入失败为重复消息

使用Myabtis-plus逆向工程

生产者

  1. @Test
  2. void repeatProducerTest()throws Exception{
  3. //创建生产者
  4. DefaultMQProducer producer = new DefaultMQProducer("repeatGroup");
  5. //连接namesrv
  6. producer.setNamesrvAddr("192.168.68.133:9876");
  7. //启动
  8. producer.start();
  9. //自身业务key唯一
  10. String Key = UUID.randomUUID().toString();
  11. System.out.println(Key);
  12. //创建消息
  13. Message message = new Message("repeatTopic", null,Key, "重复内存内容".getBytes());
  14. Message message2 = new Message("repeatTopic", null,Key, "重复内存内容".getBytes());
  15. //发送消息
  16. producer.send(message);
  17. producer.send(message2);
  18. System.out.println("发送成功");
  19. //关闭生产者
  20. producer.shutdown();
  21. }

消费者

  1. @Autowired
  2. private OrderLogMapper orderMapper;
  3. @Test
  4. void repeatConsumerTest() throws Exception {
  5. //创建消费者
  6. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeatConsumerTest");
  7. //连接namesrv
  8. consumer.setNamesrvAddr("192.168.68.133:9876");
  9. //订阅主题 *表示该主题的所有消息
  10. consumer.subscribe("repeatTopic","*");
  11. //设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
  12. consumer.registerMessageListener(new MessageListenerConcurrently() {
  13. //消费方法
  14. @Override
  15. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  16. //获取key
  17. for (MessageExt messageExt : msgs) {
  18. String key = messageExt.getKeys();
  19. //数据库中OrderID创建了唯一索引
  20. OrderLog orderLog = new OrderLog();
  21. orderLog.setType(1);
  22. orderLog.setOrderid(key);
  23. orderLog.setUsername("测试");
  24. try {
  25. orderMapper.insert(orderLog);
  26. //业务处理
  27. System.out.println("业务执行");
  28. //如若业务执行失败则执行删除orderLog操作
  29. orderMapper.deleteById(1);
  30. }catch (Exception e){
  31. if(e instanceof SQLIntegrityConstraintViolationException){
  32. e.printStackTrace();
  33. System.out.println("重复消费");
  34. //签收该消息
  35. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  36. }
  37. }
  38. }
  39. //CONSUME_SUCCESS成功 RECONSUME_LATER失败
  40. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  41. }
  42. });
  43. //启动
  44. consumer.start();
  45. //挂起当前jvm
  46. System.in.read();
  47. //关闭 consumer.shutdown();
  48. }

可能遇到的问题

问题

***************************

APPLICATION FAILED TO START

***************************

Description:

Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.

Reason: Failed to determine a suitable driver class

解决办法

1、方法一

<dependency>

<groupId>com.baomidou</groupId>

<artifactId>mybatis-plus-boot-starter</artifactId>

<version>3.5.4.1</version>

<exclusions>

<exclusion>

<groupId>com.baomidou</groupId>

<artifactId>mybatis-plus-generator</artifactId>

</exclusion>

</exclusions>

</dependency>

2、方法二

  1. <dependency>
  2. <groupId>com.baomidou</groupId>
  3. <artifactId>mybatis-plus-boot-starter</artifactId>
  4. <version>3.5.3</version>
  5. <exclusions>
  6. <exclusion>
  7. <artifactId>mybatis-spring</artifactId>
  8. <groupId>org.mybatis</groupId>
  9. </exclusion>
  10. </exclusions>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.mybatis</groupId>
  14. <artifactId>mybatis-spring</artifactId>
  15. <version>3.0.3</version>
  16. </dependency>

3、方法三

升级dynamic-datasource-spring-boot-starter版本至3.6.1或最新版本

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/713078
推荐阅读
相关标签
  

闽ICP备14008679号