赞
踩
有一个电商抢购的场景:为了避免超卖的问题,在单服务器环境下,可用通过JVM的锁解决。
@RequestMapping("/deduct_stock")
public String deductStock() {
synchronized (this) {
// 从Redis中取出当前库存
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock + "");
System.out.println("扣减成功,剩余库存:" + realStock);
} else {
System.out.println("扣减失败,库存不足");
}
}
return "end";
}
但随着业务发展,需要使用集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图:
由于Web服务1~3在三个不同的JVM进程里,而无论是synchronized关键字还是JUC包里的Lock锁,都无法保证不同JVM内共用一个锁对象,因此上面的代码在高并发场景下是一定会出现超卖问题的(使用压测工具JMeter测试即可发现)。这时就不得不使用分布式锁。
基于MySQL数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
首先创建一个表:
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
`desc` varchar(255) NOT NULL COMMENT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
获取锁:
想要执行某个方法,就使用这个方法名向表中插入数据:
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
释放锁:
成功插入则获取锁,执行完成后删除对应的行数据释放锁:
delete from method_lock where method_name ='methodName';
基于数据库实现分布式锁会出现的问题:
ShedLock是一个在分布式环境中使用的定时任务框架,用于解决在分布式环境中的多个实例的相同定时任务在同一时间点重复执行的问题。ShedLock确保计划的任务最多同时执行一次。如果一个任务正在一个节点上执行,它会获得一个锁,该锁将阻止从另一个节点(或线程)执行同一任务。请注意,如果一个任务已经在一个节点上执行,则在其他节点上的执行不会等待,只是将其跳过。简单来说,ShedLock本身只做一件事情:保证一个任务最多同时执行一次。所以如官网所说的,ShedLock不是一个分布式调度器,只是一个锁。
引入依赖:
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-provider-jdbc-template</artifactId>
<version>2.2.0</version>
</dependency>
配置文件:
@Configuration @EnableScheduling public class ShedlockConfig { @Bean public LockProvider lockProvider(DataSource dataSource) { return new JdbcTemplateLockProvider(dataSource); } @Bean public ScheduledLockConfiguration scheduledLockConfiguration(LockProvider lockProvider) { return ScheduledLockConfigurationBuilder .withLockProvider(lockProvider) .withPoolSize(10) .withDefaultLockAtMostFor(Duration.ofMinutes(10)) .build(); } }
并在启动类上加入:@EnableSchedulerLock(defaultLockAtMostFor = "PT30S")
MySQL中建表:
通过在需要加锁的方法上加入注解即可使用:
@Component
public class ScheduledLock
{
private static final int lockTime = 1000;
@Scheduled(cron = "0/2 * * * * ?")
@SchedulerLock(name = "TaskScheduler_scheduledTask", lockAtMostFor = lockTime, lockAtLeastFor = lockTime)
public void run()
{
System.out.print(new Date().toString() + "\n");
}
}
这样,无论多少个集群,该方法在同一时间只能执行一次。
主要应用以下两个命令:
SETNX:是【SET if Not eXists】的简写
格式:setnx key value
将key的值设为value,当且仅当key不存在。若给定的key已经存在,则SETNX不做任何动作。
EXPIRE:设置过期时间
expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。
核心原理在于每个线程都尝试创建一个锁的KEY,如果能创建成功,则获取锁,否则阻塞等待
代码来源:https://blog.csdn.net/nrsc272420199/article/details/106441612
加锁:
public class DistributedLock { private final JedisPool jedisPool; public DistributedLock(JedisPool jedisPool) { this.jedisPool = jedisPool; } /** * 加锁 * @param lockName 锁的key * @param acquireTimeout 获取超时时间 * @param timeout 锁的超时时间 * @return 锁标识 */ public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) { Jedis conn = null; String retIdentifier = null; try { // 获取连接 conn = jedisPool.getResource(); // 随机生成一个value String identifier = UUID.randomUUID().toString(); // 锁名,即key值 String lockKey = "lock:" + lockName; // 超时时间,上锁后超过此时间则自动释放锁 int lockExpire = (int) (timeout / 1000); // 获取锁的超时时间,超过这个时间则放弃获取锁 long end = System.currentTimeMillis() + acquireTimeout; while (System.currentTimeMillis() < end) { if (conn.setnx(lockKey, identifier) == 1) { conn.expire(lockKey, lockExpire); // 返回value值,用于释放锁时间确认 retIdentifier = identifier; return retIdentifier; } // 返回-1代表key没有设置超时时间,为key设置一个超时时间 if (conn.ttl(lockKey) == -1) { conn.expire(lockKey, lockExpire); } try { Thread.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } catch (JedisException e) { e.printStackTrace(); } finally { if (conn != null) { conn.close(); } } return retIdentifier; } /** * 释放锁 * @param lockName 锁的key * @param identifier 释放锁的标识 * @return */ public boolean releaseLock(String lockName, String identifier) { Jedis conn = null; String lockKey = "lock:" + lockName; boolean retFlag = false; try { conn = jedisPool.getResource(); while (true) { // 监视lock,准备开始事务 conn.watch(lockKey); // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁 if (identifier.equals(conn.get(lockKey))) { Transaction transaction = conn.multi(); transaction.del(lockKey); List<Object> results = transaction.exec(); if (results == null) { continue; } retFlag = true; } conn.unwatch(); break; } } catch (JedisException e) { e.printStackTrace(); } finally { if (conn != null) { conn.close(); } } return retFlag; } }
上面代码中的问题在于conn.setnx(lockKey, identifier)
与conn.expire(lockKey, lockExpire)
不是原子操作。
@RequestMapping("/deduct_stock") public String deductStock() { String lockKey = "product"; String clientID = UUID.randomUUID.toString(); try { // 尝试创建一个锁对象,并设置超时时间,避免出现异常,一直无法释放而造成死锁。 Boolean result = stringRedisTemplate.opsForValue() .setIfAbsent(lockKey, clientID, 10, TimeUnit.SECOND); if (!result) { return "系统正忙,请稍后再试"; } int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } return "end"; } finally { // 保证删除的是自己创建的key if (clientID.equals(stringRedisTemplate.opsForValue().get("stock"))) { stringRedisTemplate.delete(lockKey); } } }
上述代码也有一些问题,即当一个线程获取锁后,处理业务的时间过长,锁超过了过期时间而被删除。这是加入了一个线程来创建锁的key便可以获取锁,两个线程同时获取锁,这样显然是不可行的。
解决办法是:创建一个子线程为锁续命。
最好的方法是使用已有的Redission框架实现分布式锁,Redission官网:https://redisson.org/
引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.8.2</version>
</dependency>
单机模式下自动装配Redisson客户端
@Autowired
private RedisProperties redisProperties;
@Bean
public Redisson redisson(){
Config config = new Config();
String redisUrl = String.format("redis://%s:%s",redisProperties.getHost()+"",redisProperties.getPort()+"");
config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
config.useSingleServer().setDatabase(0);
return (Redisson) Redisson.create(config);
}
使用Redission后的扣减库存代码
@RestController public class RedisLockController { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private Redisson redisson; @GetMapping("/deduct-stock") public String DeductStock() { String LOCK_KEY = "deduct-stock-lock"; // 直接通过Redisson获取锁即可 RLock redissonLock = redisson.getLock(LOCK_KEY); // 加锁成功 try { redissonLock.lock(); // 默认超时30s释放锁 int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", realStock + ""); System.out.println("扣减成功,剩余库存:" + realStock); } else { System.out.println("扣减失败,库存不足"); } } finally { redissonLock.unlock(); System.out.println("成功释放锁"); } return "end"; } }
Redisson实现分布式锁的原理:Lua脚本实现
具体实现参考:https://www.cnblogs.com/cjsblog/p/9831423.html
优化一:
上面使用Redission框架的方法也有一定的缺陷,因为Redis的主节点宕机后,切换到从节点时会丢失少量数据,若分布式锁的数据丢失,会造成锁失效。
解决办法是使用Redlock,解决方案参考了Zookeeper的多数派。
方法二:
分布式锁必然影响效率,上述方法都是单线程执行,效率很低。
考虑将数据存放于不同的节点或槽,然后读取数据时按槽进行读取,类似于ConcurrentHashMap的设计理念。
例如一个商品共10000件,分为100份,每份100个,分别加分布式锁即可。
大致思想为:每个客户端对某个方法加锁时,在Zookeeper上与该方法对应的指定节点的目录下,生成一个唯一的临时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个临时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
定义锁:通过Zookeeper上的数据节点来表示一个锁
获取锁:客户端通过调用 create 方法创建表示锁的临时节点,可以认为创建成功的客户端获得了锁,同时可以让没有获得锁的节点在该节点上注册Watcher监听,以便实时监听到lock节点的变更情况
释放锁:以下两种情况都可以让锁释放
Zookeeper分布式锁还可以实现共享锁:
对于读请求:
对于写请求:如果自己不是序号最小的节点,那么等待。
实现分布式锁的代码如下:
public class DistributedLock implements Lock, Watcher { private ZooKeeper zk = null; // 根节点 private String ROOT_LOCK = "/lock_msb"; // 竞争的资源 private String lockName; // 等待的前一个锁 private String WAIT_LOCK; // 当前锁 private String CURRENT_LOCK; // 计数器 private CountDownLatch countDownLatch; private int sessionTimeout = 3000000; private List<Exception> exceptionList = new ArrayList<Exception>(); /** * 配置分布式锁 * @param config 连接的url * @param lockName 竞争资源 */ public DistributedLock(String config, String lockName) { this.lockName = lockName; try { // 连接zookeeper zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(ROOT_LOCK, false); if (stat == null) { // 如果根节点不存在,则创建根节点 zk.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } // 节点监视器 public void process(WatchedEvent event) { if (this.countDownLatch != null) { this.countDownLatch.countDown(); } } public void lock() { if (exceptionList.size() > 0) { throw new LockException(exceptionList.get(0)); } try { if (this.tryLock()) { System.out.println(Thread.currentThread().getName() + " " + lockName + "获得了锁"); return; } else { // 等待锁 waitForLock(WAIT_LOCK, sessionTimeout); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public boolean tryLock() { try { String splitStr = "_lock_"; if (lockName.contains(splitStr)) { throw new LockException("锁名有误"); } // 创建临时有序节点 CURRENT_LOCK = zk.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(CURRENT_LOCK + " 已经创建"); // 取所有子节点 List<String> subNodes = zk.getChildren(ROOT_LOCK, false); // 取出所有lockName的锁 List<String> lockObjects = new ArrayList<String>(); for (String node : subNodes) { String _node = node.split(splitStr)[0]; if (_node.equals(lockName)) { lockObjects.add(node); } } Collections.sort(lockObjects); System.out.println(Thread.currentThread().getName() + " 的锁是 " + CURRENT_LOCK); // 若当前节点为最小节点,则获取锁成功 if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) { return true; } // 若不是最小节点,则找到自己的前一个节点 String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1); WAIT_LOCK = lockObjects.get(Collections.binarySearch(lockObjects, prevNode) - 1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } return false; } public boolean tryLock(long timeout, TimeUnit unit) { try { if (this.tryLock()) { return true; } return waitForLock(WAIT_LOCK, timeout); } catch (Exception e) { e.printStackTrace(); } return false; } // 等待锁 private boolean waitForLock(String prev, long waitTime) throws KeeperException, InterruptedException { // 监听自己的前一个节点 Stat stat = zk.exists(ROOT_LOCK + "/" + prev, true); if (stat != null) { System.out.println(Thread.currentThread().getName() + "等待锁 " + ROOT_LOCK + "/" + prev); this.countDownLatch = new CountDownLatch(1); // 计数等待,若等到前一个节点消失,则precess中进行countDown,停止等待,获取锁 this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS); this.countDownLatch = null; System.out.println(Thread.currentThread().getName() + " 等到了锁"); } return true; } public void unlock() { try { System.out.println("释放锁 " + CURRENT_LOCK); zk.delete(CURRENT_LOCK, -1); CURRENT_LOCK = null; zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } public Condition newCondition() { return null; } public void lockInterruptibly() throws InterruptedException { this.lock(); } public class LockException extends RuntimeException { private static final long serialVersionUID = 1L; public LockException(String e){ super(e); } public LockException(Exception e){ super(e); } } }
MySQL的实现最为简单,但效率低且可靠性差;
Redis保证AP(可用性+分区容错性),主从异步执行;
Zookeeper保证CP(一致性+分区容错性),保证一半以上Follower同步成功,一定能拿到正确数据。
用Zookeeper实现分布式锁的性能不如Redis,但更可靠。
分布式锁 | 优点 | 缺点 |
---|---|---|
Zookeeper | 有等待锁的队列,大大提高抢锁的效率 | 添加和删除节点的效率较低 |
Redis | SETNX和DEL的效率高 | 需要客户端的自旋获取锁,且主从架构可能出现数据丢失的问题 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。