赞
踩
分布式锁是用来解决分布式并发问题的
订单——商品超卖问题
-- ---------------------------- -- Table structure for tb_products -- ---------------------------- CREATE TABLE `tb_products` ( `product_id` int( 0 ) NOT NULL AUTO_INCREMENT, `product_name` varchar( 50 ) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `product_desc` varchar( 200 ) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `product_stock` int( 0 ) NULL DEFAULT NULL, PRIMARY KEY (`product_id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 11 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of tb_products -- ---------------------------- INSERT INTO `tb_products` VALUES ( 1 , '小米10', '小米 10 描述信息', 3 ); INSERT INTO `tb_products` VALUES ( 2 , '华为P50', '华为P50描述信息', 12 ); INSERT INTO `tb_products` VALUES ( 3 , '康佳电视', '康佳电视描述信息', 11 ); INSERT INTO `tb_products` VALUES ( 4 , '铁三角麦克风', '铁三角麦克风描述信息', 9 ); INSERT INTO `tb_products` VALUES ( 5 , '海尔冰箱', '海尔冰箱描述信息', 12 ); INSERT INTO `tb_products` VALUES ( 6 , '美的空调', '美的空调描述信息', 6 ); INSERT INTO `tb_products` VALUES ( 7 , '联想笔记本', '联想笔记本描述信息', 6 ); INSERT INTO `tb_products` VALUES ( 8 , '哇哈哈', '哇哈哈描述信息', 6 ); INSERT INTO `tb_products` VALUES ( 9 , 'test', 'testdesc', 6 ); INSERT INTO `tb_products` VALUES ( 10 , '康师傅', '康师傅方便面', 6 ); SET FOREIGN_KEY_CHECKS = 1 ;
100 个并发请求同时购买一个商品——商品超卖
当后端项目单机部署时,使用JVM锁可以解决由于提交订单的并发请求导致的商品超卖问题
@RestController @CrossOrigin @RequestMapping("/product") public class ProductController { @Autowired private ProductService productService; @PostMapping("/addorder") public ResultVO submitOrder(Integer productId, Integer num){ ResultVO resultVO = null; synchronized (productService) { //开启事务 //提交订单业务 resultVO = productService.saveOrder(productId, num); //提交事务 } return resultVO; } }
将项目进行集群部署
并发访问测试
出现了商品超卖:
@Service public class OrderServiceImpl implements OrderService { @Autowired private ProductDAO productDAO; @Value("${server.port}") private int port; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public ResultVO saveOrder(int productId, int num) { //【加锁】:以当前商品的ID为key,向redis执行 setnx 1 aaa // 设置成功则表示加锁成功 // 设置失败则表示加锁失败 boolean b = stringRedisTemplate.boundValueOps("" + productId).setIfAbsent("aaa"); if(b){ try { //加锁成功 //1.根据id查询库存 Product product = productDAO.selectById(productId); //2.判断库存 (✔) if (product.getProductStock() >= num) { //3.保存订单 System.out.println(port + ":------------save order!"); //4.保存商品快照 //System.out.println("------------save order item!"); //5.修改商品库存 int i = productDAO.updateStockById(productId, product.getProductStock() - num); if (i > 0) { return new ResultVO(200, "下单成功!-" + port, null); } else { return new ResultVO(502, "下单失败!-" + port, null); } } else { return new ResultVO(501, "商品库存不足!-" + port, null); } }catch (Exception e){ e.printStackTrace(); } finally { //【释放锁】:del 1 stringRedisTemplate.delete(""+productId); } }else{ return null; } } }
- 阻塞锁:不断尝试获取锁,直到获取到锁为止
- 非阻塞锁:如果获取不到锁就放弃,但可以支持在一定时间段内的重试
——在一段时间内如果没有获取到锁就放弃
自定义阻塞锁的实现
@Service public class OrderServiceImpl implements OrderService { @Autowired private ProductDAO productDAO; @Value("${server.port}") private int port; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public ResultVO saveOrder(int productId, int num) { //【加锁】:以当前商品的ID为key,向redis执行 setnx 1 aaa // 设置成功则表示加锁成功 // 设置失败则表示加锁失败 boolean b = stringRedisTemplate.boundValueOps("" + productId).setIfAbsent("aaa"); //如果b为false(加锁失败),不断的进行尝试 while(!b){ b = stringRedisTemplate.boundValueOps("" + productId).setIfAbsent("aaa"); } try { //加锁成功 //1.根据id查询库存 Product product = productDAO.selectById(productId); //2.判断库存 (✔) if (product.getProductStock() >= num) { //3.保存订单 System.out.println(port + ":------------save order!"); //4.保存商品快照 //System.out.println("------------save order item!"); //5.修改商品库存 int i = productDAO.updateStockById(productId, product.getProductStock() - num); if (i > 0) { return new ResultVO(200, "下单成功!-" + port, null); } else { return new ResultVO(502, "下单失败!-" + port, null); } } else { return new ResultVO(501, "商品库存不足!-" + port, null); } }catch (Exception e){ e.printStackTrace(); } finally { //【释放锁】:del 1 stringRedisTemplate.delete(""+productId); } return null; } }
自定义非阻塞锁的实现
@Service public class OrderServiceImpl implements OrderService { @Autowired private ProductDAO productDAO; @Value("${server.port}") private int port; @Autowired private StringRedisTemplate stringRedisTemplate; @Override public ResultVO saveOrder(int productId, int num) { //【加锁】:以当前商品的ID为key,向redis执行 setnx 1 aaa // 设置成功则表示加锁成功 // 设置失败则表示加锁失败 boolean b = stringRedisTemplate.boundValueOps("" + productId).setIfAbsent("aaa"); //自定义非阻塞锁: 如果加锁失败,则尝试加锁,超时时间为3秒 long beginTime = System.currentTimeMillis(); while(!b && System.currentTimeMillis()<beginTime+3000){ b = stringRedisTemplate.boundValueOps("" + productId).setIfAbsent("aaa"); } if(b) { try { //加锁成功 //1.根据id查询库存 Product product = productDAO.selectById(productId); //2.判断库存 (✔) if (product.getProductStock() >= num) { //3.保存订单 System.out.println(port + ":------------save order!"); //4.保存商品快照 //System.out.println("------------save order item!"); //5.修改商品库存 int i = productDAO.updateStockById(productId, product.getProductStock() - num); if (i > 0) { return new ResultVO(200, "下单成功!-" + port, null); } else { return new ResultVO(502, "下单失败!-" + port, null); } } else { return new ResultVO(501, "商品库存不足!-" + port, null); } } catch (Exception e) { e.printStackTrace(); } finally { //【释放锁】:del 1 stringRedisTemplate.delete("" + productId); } } return new ResultVO(503,"网络超时,请重试",null); } }
公平锁和非公平锁 100 个线程 线程 3 剩下的 99 个要等待,当线程 3 执行结束以后,这 99 个线 程到底谁获取执行权呢?
- 公平锁:按照线程的先后顺序获取锁
- 非公平锁:多个正在等待的线程随机获取锁
问题①:
问题②:
问题③:
解决方案:在对商品进行加锁时,设置过期时间,这样以来即使线程出现故障无法释放锁,在过期时间结束时也会自动“释放锁”
boolean b = stringRedisTemplate.boundValueOps(productId + "")
.setIfAbsent("value",3,TimeUnit.MINUTES);
新的问题 :
当给锁设置了过期时间之后,如果当前线程t1因为特殊原因,在锁过期前没有完成业务执行,将会释放锁,同时其他线程(t2)就可以成功加锁了,当t2加锁成功之后,t1执行结束释放锁就会释放t2的锁,就会导致t2在无锁状态下执行业务。
在加锁的时候,为每个商品设置唯一的value,当释放锁的时候,先查询redis中当前商品锁对应的值与加锁的时候设置的值是否一致,如果一致则释放锁
String value = UUID.randomUUID().toString();
boolean b = stringRedisTemplate.boundValueOps(productId + "")
.setIfAbsent(value,3,TimeUnit.MINUTES);
//查询操作
String v = stringRedisTemplate.boundValueOps(productId + "").get();
if(value.equals(v)){
//删除操作
stringRedisTemplate.delete(productId+"");
}
新的问题 :
当释放锁的时候,在查询并判断“这个锁是当前线程加的锁”成功之后,正要进行删除时锁过期了,并且被其他线程成功加锁,一样会导致当前线程删除其他线程的锁。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
@Bean
public DefaultRedisScript<List> defaultRedisScript(){
DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>
();
defaultRedisScript.setResultType(List.class);
defaultRedisScript.setScriptSource(new ResourceScriptSource(new
ClassPathResource("unlock.lua")));
return defaultRedisScript;
}
@AutoWired
private DefaultRedisScript defaultRedisScript;
//执行lua脚本
List<String> keys = new ArrayList<>();
keys.add(skuId);
List rs = stringRedisTemplate.execute(defaultRedisScript,keys ,
values.get(skuId));
System.out.println(rs.get(0));
问题:t1的锁因超时而自动释放,导致t2可以成功加锁,t1和t2就可能会出现并发问题
看门狗线程工作原理:
- 监听当前线程锁的过期时间,当锁即将过期时如果业务没有执行结束,则重置锁的过期时间,保证业务线程正常执行的过程中,锁不会过期。
基于Redis+看门狗机制的分布式锁框架
Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
redisson:
addr:
singleAddr:
host: redis://47.96.11.185:6380
password: 123456
database: 0
@Configuration public class RedissonConfig { @Value("${redisson.addr.singleAddr.host}") private String host; @Value("${redisson.addr.singleAddr.password}") private String password; @Value("${redisson.addr.singleAddr.database}") private int database; @Bean public RedissonClient redissonClient(){ Config config = new Config(); config .useSingleServer() .setAddress(host) .setPassword(password) .setDatabase(database); return Redisson.create(config); } }
@Service public class ProductServiceImpl implements ProductService { @Autowired private ProductDAO productDAO; @Autowired private RedissonClient redissonClient; @Transactional synchronized public ResultVO saveOrder(int productId, int num) { ResultVO resultVO = new ResultVO(504,"系统繁忙,请重试",null); //获取锁(公平锁和非公平锁) //RLock lock = redissonClient.getFairLock(productId + "");//获取公平锁 RLock lock = redissonClient.getLock(productId + "");//获取非公平锁 //加锁:在加锁的时候,可以设置是阻塞锁还是非阻塞锁 //lock.lock(); //阻塞锁 try { boolean b = lock.tryLock(3,TimeUnit.SECONDS);//非阻塞锁 if(b){ //业务 } } catch (InterruptedException e) { e.printStackTrace(); }finally { lock.unlock(); } return resultVO; } }
redisson:
addr:
singleAddr:
host: redis://47.96.11.185:6370
password: 12345678
database: 0
@Configuration public class RedissonConfig { @Value("${redisson.addr.singleAddr.host}") private String host; @Value("${redisson.addr.singleAddr.password}") private String password; @Value("${redisson.addr.singleAddr.database}") private int database; @Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer() .setAddress(host) .setPassword(password) .setDatabase(database); return Redisson.create(config); } }
redisson:
addr:
cluster:
hosts: redis://47.96.11.185:6370,...,redis://47.96.11.185:6373
password: 12345678
@Configuration public class RedissonConfig { @Value("${redisson.addr.cluster.hosts}") private String hosts; @Value("${redisson.addr.cluster.password}") private String password; /** * 集群模式 * @return */ @Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useClusterServers().addNodeAddress(hosts.split("[,]")) .setPassword(password) .setScanInterval(2000) .setMasterConnectionPoolSize(10000) .setSlaveConnectionPoolSize(10000); return Redisson.create(config); } }
redisson:
addr:
masterAndSlave:
masterhost: redis://47.96.11.185:6370
slavehosts: redis://47.96.11.185:6371,redis://47.96.11.185:6372
password: 12345678
database: 0
@Configuration public class RedissonConfig3 { @Value("${redisson.addr.masterAndSlave.masterhost}") private String masterhost; @Value("${redisson.addr.masterAndSlave.slavehosts}") private String slavehosts; @Value("${redisson.addr.masterAndSlave.password}") private String password; @Value("${redisson.addr.masterAndSlave.database}") private int database; /** * 主从模式 * @return */ @Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useMasterSlaveServers() .setMasterAddress(masterhost) .addSlaveAddress(slavehosts.split("[,]")) .setPassword(password) .setDatabase(database) .setMasterConnectionPoolSize(10000) .setSlaveConnectionPoolSize(10000); return Redisson.create(config); }
1 、互斥性
和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
2 、可重入性
同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
3 、锁超时
和本地锁一样支持锁超时,加锁成功之后设置超时时间,以防止线程故障导致不释放锁,防止死锁。
4 、高效,高可用
加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。redission是基于redis的,redis的故障就会导致redission锁的故障,因此redission支持单节点redis、reids主从、reids集群
5 、支持阻塞和非阻塞
和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut)。
1 、乐观锁与悲观锁
2 、可重入锁和非可重入锁
3 、公平锁和非公平锁
4 、阻塞锁和非阻塞锁
1 、获取锁——公平锁和非公平锁
//获取公平锁
RLock lock = redissonClient.getFairLock(skuId);
//获取非公平锁
RLock lock = redissonClient.getLock(skuId);
2 、加锁——阻塞锁和非阻塞锁
//阻塞锁(如果加锁成功之后,超时时间为30s;加锁成功开启看门狗,剩5s延长过期时间)
lock.lock();
//阻塞锁(如果加锁成功之后,设置自定义20s的超时时间)
lock.lock(20,TimeUnit.SECONDS);
//非阻塞锁(设置等待时间为3s;如果加锁成功默认超时间为30s)
boolean b = lock.tryLock(3,TimeUnit.SECONDS);
//非阻塞锁(设置等待时间为3s;如果加锁成功设置自定义超时间为20s)
boolean b = lock.tryLock(3,20,TimeUnit.SECONDS);
3 、释放锁
lock.unlock();
4 、应用示例
@Service public class OrderServiceImpl implements OrderService { @Autowired private ProductDAO productDAO; @Value("${server.port}") private int port; @Autowired private RedissonClient redissonClient; @Override public ResultVO saveOrder(int productId, int num) throws InterruptedException { //获取锁: getLock 获取非公平锁 getFairLock 获取公平锁 RLock lock = redissonClient.getFairLock("" + productId); //lock : 阻塞锁 lock.lock(); //tryLock : 非阻塞锁 //boolean b = lock.tryLock(3,TimeUnit.SECONDS); //执行订单业务 try { //加锁成功 //1.根据id查询库存 Product product = productDAO.selectById(productId); //2.判断库存 (✔) if (product.getProductStock() >= num) { //3.保存订单 System.out.println(port + ":------------save order!"); //4.保存商品快照 //System.out.println("------------save order item!"); //5.修改商品库存 int i = productDAO.updateStockById(productId, product.getProductStock() - num); if (i > 0) { return new ResultVO(200, "下单成功!-" + port, null); } else { return new ResultVO(502, "下单失败!-" + port, null); } } else { return new ResultVO(501, "商品库存不足!-" + port, null); } } catch (Exception e) { e.printStackTrace(); } finally { //释放锁 lock.unlock(); } return new ResultVO(503,"网络超时,请重试",null); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。