赞
踩
环境介绍 | |
技术栈 | springboot+mybatis-plus+mysql+rocketmq |
软件 | 版本 |
mysql | 8 |
IDEA | IntelliJ IDEA 2022.2.1 |
JDK | 17 |
Spring Boot | 3.1.7 |
dynamic-datasource | 3.6.1 |
mybatis-plus | 3.5.3.2 |
rocketmq | 4.9.4 |
加入依赖
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- <exclusions><!-- 排除logback依赖 -->
- <exclusion>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!--Log4j2场景启动器 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-log4j2</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>3.5.3</version>
- <exclusions>
- <exclusion>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-generator</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.mysql</groupId>
- <artifactId>mysql-connector-j</artifactId>
- <scope>runtime</scope>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid-spring-boot-starter</artifactId>
- <version>1.1.14</version>
- </dependency>
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
- <version>3.6.1</version>
- </dependency>
- <dependency>
- <groupId>p6spy</groupId>
- <artifactId>p6spy</artifactId>
- <version>3.9.1</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.9.2</version>
- </dependency>
-
- </dependencies>
消息中间件: activeMQ:java(jms协议),性能一般,吞吐量低。rabbitMQ:erlang(amqp协议),性能好,功能丰富,吞吐量一般。rocketMQ:java,性能好,吞吐量丰富,功能丰富。Kafka: scala,吞吐量最大,功能单一,大数据领域
RocketMQ 是阿里开源的分布式消息中间件,跟其它中间件相比,RocketMQ 的特点是纯JAVA实现,是一套提供了消息生产,存储,消费全过程API的软件系统。
RocketMQ的作用:数据收集、限流削峰、异步解耦
数据收集
分布式系统会产生海量级数据流,如:业务日志、监控数据、用户行为等。针对这些数据流进行实时或批量采集汇总,然后对这些数据流进行大数据分析,这是当前互联网平台的必备技术。通过MQ完成此类数据收集是最好的选择。
限流削峰
MQ可以将系统的超量请求暂存其中,以便系统后期可以慢慢进行处理,从而避免了请求的丢失或系统被压垮。
异步解耦
上游系统对下游系统的调用若为同步调用,则会大大降低系统的吞吐量与并发度,且系统耦合度太高、而异步调用则会解决这些问题。所以两层之间若要实现由同步到异步的转化,一般性做法就是,在这两层间添加一个MQ层。
rocketmq.apache.org
Broker:经纪人(经理人)
Topic主题:消息区分,分类,虚拟结构
Queue:消息队列
Apache RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
1、 生产者多次投递
2、负载均衡模式消费者扩容时重试
解决办法
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等,需要控制消息的幂等性),这种情况就需要使业务字段进行重复消费。
幂等性:多次操作产生的结果和第一次操作产生的结果一致。
存储key可用mysql,oracle,redis等数据库做验证
本次解决方案为:将key插入Mysql数据库,创建唯一索引,插入成功执行业务逻辑,插入失败为重复消息
使用Myabtis-plus逆向工程
生产者
- @Test
-
- void repeatProducerTest()throws Exception{
-
- //创建生产者
-
- DefaultMQProducer producer = new DefaultMQProducer("repeatGroup");
-
- //连接namesrv
-
- producer.setNamesrvAddr("192.168.68.133:9876");
-
- //启动
-
- producer.start();
-
- //自身业务key唯一
-
- String Key = UUID.randomUUID().toString();
-
- System.out.println(Key);
-
- //创建消息
-
- Message message = new Message("repeatTopic", null,Key, "重复内存内容".getBytes());
-
- Message message2 = new Message("repeatTopic", null,Key, "重复内存内容".getBytes());
-
- //发送消息
-
- producer.send(message);
-
- producer.send(message2);
-
- System.out.println("发送成功");
-
- //关闭生产者
-
- producer.shutdown();
-
- }
消费者
- @Autowired
-
- private OrderLogMapper orderMapper;
- @Test
-
- void repeatConsumerTest() throws Exception {
-
- //创建消费者
-
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeatConsumerTest");
-
- //连接namesrv
-
- consumer.setNamesrvAddr("192.168.68.133:9876");
-
- //订阅主题 *表示该主题的所有消息
-
- consumer.subscribe("repeatTopic","*");
- //设置监听器(一直,异步回调方式) MessageListenerConcurrently并发模式
-
- consumer.registerMessageListener(new MessageListenerConcurrently() {
-
- //消费方法
-
- @Override
-
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-
- //获取key
- for (MessageExt messageExt : msgs) {
- String key = messageExt.getKeys();
- //数据库中OrderID创建了唯一索引
- OrderLog orderLog = new OrderLog();
- orderLog.setType(1);
- orderLog.setOrderid(key);
- orderLog.setUsername("测试");
- try {
- orderMapper.insert(orderLog);
- //业务处理
- System.out.println("业务执行");
- //如若业务执行失败则执行删除orderLog操作
- orderMapper.deleteById(1);
-
- }catch (Exception e){
- if(e instanceof SQLIntegrityConstraintViolationException){
- e.printStackTrace();
- System.out.println("重复消费");
- //签收该消息
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
-
- }
- }
-
- //CONSUME_SUCCESS成功 RECONSUME_LATER失败
-
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-
- }
-
- });
-
- //启动
-
- consumer.start();
-
- //挂起当前jvm
-
- System.in.read();
-
- //关闭 consumer.shutdown();
-
- }
问题
***************************
APPLICATION FAILED TO START
***************************
Description:
Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.
Reason: Failed to determine a suitable driver class
解决办法
1、方法一
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.4.1</version>
<exclusions>
<exclusion>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
</exclusion>
</exclusions>
</dependency>
或
2、方法二
- <dependency>
- <groupId>com.baomidou</groupId>
- <artifactId>mybatis-plus-boot-starter</artifactId>
- <version>3.5.3</version>
- <exclusions>
- <exclusion>
- <artifactId>mybatis-spring</artifactId>
- <groupId>org.mybatis</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.mybatis</groupId>
- <artifactId>mybatis-spring</artifactId>
- <version>3.0.3</version>
- </dependency>
3、方法三
升级dynamic-datasource-spring-boot-starter版本至3.6.1或最新版本
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。