赞
踩
本方案采用rabbitMQ、redis的原因:
- redis 用来存储商品库存信息,用来缓解DB的压力;
- rabbitMQ 来做 redis 和 DB 之间的商品库存数据同步,以及代码解耦;
方案缺点:
缺点1:多了层MQ,也就是会有很大的概率导致同步延迟问题.
缺点2:要对MQ的可用性做预防
缺点3:如果人为改数据库,那就没法同步了
方案优点:
优点1:可以大幅减少接口的延迟返回的问题
优点2:身有重试机制,无需人工去写重试代码
优点3:解耦,把查询Mysql和同步Redis完全分离,互不干扰
Spring Boot 2.7.15
JDK8
Redis 3.2.10
RabbitMQ 3.10.2
MySQL 8.0.32
CREATE TABLE `goods` (
`id` bigint NOT NULL AUTO_INCREMENT,
`goodsname` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '商品名称',
`goodsnum` int NULL DEFAULT NULL COMMENT '商品库存',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
pom.xml 的依赖配置:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- JDBC驱动--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!-- redis 缓存操作 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- mysql--> <dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-j</artifactId> </dependency> <!-- lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- mybatis plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.3.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
application.yml 配置文件:
spring: # rabbitmq配置 rabbitmq: host: localhost publisher-returns: true publisher-confirm-type: correlated username: admin password: admin port: 5672 listener: simple: acknowledge-mode: manual # mysql8.0配置 datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3399/ry-cloud?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT&allowPublicKeyRetrieval=true username: root password: qwe123 #redis配置 redis: host: localhost port: 6379 database: 0 password: timeout: 10s lettuce: pool: min-idle: 0 max-idle: 8 max-active: 8 max-wait: -1ms
GoodsMapper.xml.xml:
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.example.mysqlredismqdemo.mapper.GoodsMapper"> <resultMap type="com.example.mysqlredismqdemo.entity.Goods" id="GoodsResult"> <result property="id" column="id" /> <result property="goodsname" column="goodsname" /> <result property="goodsnum" column="goodsnum" /> </resultMap> <select id="selectAll" resultMap="GoodsResult"> select id,goodsname,goodsnum from goods </select> <select id="selectGoodsNum" resultType="Integer"> SELECT COUNT(id)nums FROM `goods` </select> </mapper>
Mapper的接口文件:
import com.example.mysqlredismqdemo.entity.Goods; import java.util.List; /** * 用户表 数据层 * * @author ruoyi */ public interface GoodsMapper { /** * @return {@link List}<{@link Goods}> */ public List<Goods> selectAll(); public Integer selectGoodsNum(); }
rabbitMQ的队列、交换机配置文件- ------- MQConfig.java:
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Configuration public class MQConfig { @Resource private RabbitTemplate rabbitTemplate; @Bean public Queue directQueue1() { return new Queue("queue1",false); } @Bean public Queue directQueue2() { return new Queue("queue2",false); } @Bean public DirectExchange directExchange() { return new DirectExchange("DirectExchange",false,true); } @Bean public Binding bindingDirect() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("queue1-1"); } @Bean public Binding bindingDirect2() { return BindingBuilder.bind(directQueue2()).to(directExchange()).with("queue2-1"); } @PostConstruct public void confirmCallbackAck() { rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) System.out.println("发送失败"); else System.out.println("发送成功"); }); } }
redis 工具类 ------- RedisCache.java:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundSetOperations; import org.springframework.data.redis.core.HashOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.TimeUnit; /** * spring redis 工具类 * * @author ruoyi **/ @SuppressWarnings(value = {"unchecked", "rawtypes"}) @Component public class RedisCache { @Autowired public RedisTemplate redisTemplate; /** * 缓存基本的对象,Integer、String、实体类等 * * @param key 缓存的键值 * @param value 缓存的值 */ public <T> void setCacheObject(final String key, final T value) { redisTemplate.opsForValue().set(key, value); } /** * 缓存基本的对象,Integer、String、实体类等 * * @param key 缓存的键值 * @param value 缓存的值 * @param timeout 时间 * @param timeUnit 时间颗粒度 */ public <T> void setCacheObject(final String key, final T value, final Integer timeout, final TimeUnit timeUnit) { redisTemplate.opsForValue().set(key, value, timeout, timeUnit); } /** * 设置有效时间 * * @param key Redis键 * @param timeout 超时时间 * @return true=设置成功;false=设置失败 */ public boolean expire(final String key, final long timeout) { return expire(key, timeout, TimeUnit.SECONDS); } /** * 设置有效时间 * * @param key Redis键 * @param timeout 超时时间 * @param unit 时间单位 * @return true=设置成功;false=设置失败 */ public boolean expire(final String key, final long timeout, final TimeUnit unit) { return redisTemplate.expire(key, timeout, unit); } /** * 获得缓存的基本对象。 * * @param key 缓存键值 * @return 缓存键值对应的数据 */ public <T> T getCacheObject(final String key) { ValueOperations<String, T> operation = redisTemplate.opsForValue(); return operation.get(key); } /** * 删除单个对象 * * @param key */ public boolean deleteObject(final String key) { return redisTemplate.delete(key); } /** * 删除集合对象 * * @param collection 多个对象 * @return */ public long deleteObject(final Collection collection) { return redisTemplate.delete(collection); } /** * 缓存List数据 * * @param key 缓存的键值 * @param dataList 待缓存的List数据 * @return 缓存的对象 */ public <T> long setCacheList(final String key, final List<T> dataList) { Long count = redisTemplate.opsForList().rightPushAll(key, dataList); return count == null ? 0 : count; } /** * 获得缓存的list对象 * * @param key 缓存的键值 * @return 缓存键值对应的数据 */ public <T> List<T> getCacheList(final String key) { return redisTemplate.opsForList().range(key, 0, -1); } /** * 缓存Set * * @param key 缓存键值 * @param dataSet 缓存的数据 * @return 缓存数据的对象 */ public <T> BoundSetOperations<String, T> setCacheSet(final String key, final Set<T> dataSet) { BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key); Iterator<T> it = dataSet.iterator(); while (it.hasNext()) { setOperation.add(it.next()); } return setOperation; } /** * 获得缓存的set * * @param key * @return */ public <T> Set<T> getCacheSet(final String key) { return redisTemplate.opsForSet().members(key); } /** * 缓存Map * * @param key * @param dataMap */ public <T> void setCacheMap(final String key, final Map<String, T> dataMap) { if (dataMap != null) { redisTemplate.opsForHash().putAll(key, dataMap); } } /** * 缓存Map * * @param key */ public <T> void setOneMap(final String key, Object hashKey, Object value) { redisTemplate.opsForHash().put(key, hashKey, value); } /** * 获得缓存的Map * * @param key * @return */ public <T> Map<String, T> getCacheMap(final String key) { return redisTemplate.opsForHash().entries(key); } /** * 往Hash中存入数据 * * @param key Redis键 * @param hKey Hash键 * @param value 值 */ public <T> void setCacheMapValue(final String key, final String hKey, final T value) { redisTemplate.opsForHash().put(key, hKey, value); } /** * 获取Hash中的数据 * * @param key Redis键 * @param hKey Hash键 * @return Hash中的对象 */ public <T> T getCacheMapValue(final String key, final String hKey) { HashOperations<String, String, T> opsForHash = redisTemplate.opsForHash(); return opsForHash.get(key, hKey); } /** * 获取多个Hash中的数据 * * @param key Redis键 * @param hKeys Hash键集合 * @return Hash对象集合 */ public <T> List<T> getMultiCacheMapValue(final String key, final Collection<Object> hKeys) { return redisTemplate.opsForHash().multiGet(key, hKeys); } /** * 获得缓存的基本对象列表 * * @param pattern 字符串前缀 * @return 对象列表 */ public Collection<String> keys(final String pattern) { return redisTemplate.keys(pattern); } }
import com.example.mysqlredismqdemo.entity.Goods; import com.example.mysqlredismqdemo.mapper.GoodsMapper; import com.example.mysqlredismqdemo.utils.RedisCache; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author wen * @date 2023/09/19 */ @Component @Slf4j public class SystemAddOrderConfig { @Resource private RedisCache redisCache; @Resource private GoodsMapper goodsMapper; @PostConstruct public void addOrder() { redisCache.deleteObject("goods");//删除redis中的商品库存信息 Integer integer = goodsMapper.selectGoodsNum();//获取商品数量 if (integer != 0) { List<Goods> goodsList = goodsMapper.selectAll();//获取商品库存信息 Map goodMap = new HashMap(); goodsList.forEach(c->{ //将数据库的商品库存信息放入redis,存储形式Map: goods:商品id:库存数量 goodMap.put(c.getId(), c.getGoodsnum()); }); redisCache.setCacheMap("goods",goodMap); return; } log.info("有缓存商品库存"); } }
import com.example.mysqlredismqdemo.utils.RedisCache; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; @RestController public class ConfirmOrderController { @Resource private RedisCache redisCache; @Resource private AmqpTemplate amqpTemplate; /** * 下单(扣库存) * * @param orderId 商品编号 * @param number 数量 * @return {@link String} */ @PostMapping("/confirm") public String confirmOrder(Long orderId, Integer number) throws JsonProcessingException { //todo 校验入参和库存 Map<String, Object> goodsMap = redisCache.getCacheMap("goods");//获取商品库存信息 //同步锁 synchronized (this){ Integer goodsnum =(Integer) goodsMap.get(orderId); if (goodsnum == null) { throw new RuntimeException("商品为空"); } if (goodsnum == 0) { throw new RuntimeException("库存不足"); } if (goodsnum < number) { throw new RuntimeException("库存不足"); } //扣库存 redisCache.setOneMap("goods", orderId, goodsnum - number); //将商品信息发送至MQ Map sendMessageMap = new HashMap<>(); sendMessageMap.put("id",orderId); sendMessageMap.put("num",goodsnum - number); ObjectMapper objectMapper = new ObjectMapper(); String s = objectMapper.writeValueAsString(sendMessageMap); amqpTemplate.convertAndSend("DirectExchange", "queue1-1", s); } return "成功"; } }
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class RevieMQ { @RabbitListener(queues = "queue1") public void review(String msg, Message message, Channel channel) throws IOException { System.out.println("监听到队列1发送的消息:"+msg); //todo 同步数据库 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//MQ回调 } }
欢迎大家提出自己的疑惑点
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。