赞
踩
分布式锁其实就是控制分布式系统中不同进程共同访问共享资源的一种锁的实现。在分布式系统中各个微服务都是独立部署在不同的服务器上,如果多个服务同时操作同一个共享资源的话,就不能像单体服务那样通过synchronized或者Lock等同步机制保证一个代码块在同一时间只能由一个线程访问来实现共享资源的安全性。因为分布式系统中的不同服务已经不在是多线程之间的并发访问了,而是属于多进程之间的并发访问,所以就需要一种更加高级的锁机制,来处理这种跨JVM进程之间的线程安全问题。
在 Java 中,实现分布式锁的方案有多种,常见的几种方案如下:
(1)setnx + expire:这种方式加锁操作和设置超时时间是分开的。如果在执行完setnx加锁后,正要执行expire设置过期时间时,进程挂掉了,那这个锁就永远不会过期了。
(2)set的扩展命令:通过set(String key, String value, String nxxx, String expx, int time) 加锁的同时设置过期时间,再通过del(key)删除key。这种方式可能导致锁被别的线程误删,假设A获取锁后,由于业务还没执行完就过期释放了,然后立即就被B获取该锁执行业务逻辑,此时A执行完成后就会去释放这个锁,但此时这个锁已经被B占用了,也就是说A此时把B的锁给释放掉了。
(3)set的扩展命令+唯一值校验:通过set(String key, String value, String nxxx, String expx, int time) 加锁的同时设置过期时间,再通过Lua 脚本去根据唯一值删除key。这种方式可以解决误删除别人的锁问题,但是还是存在锁过期释放了,业务还没执行完的问题。
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
@Configuration public class RedisConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private int port; @Value("${spring.redis.timeout}") private int timeout; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.pool.maxTotal}") private int maxTotal; @Value("${spring.redis.pool.maxWait}") private int maxWait; @Value("${spring.redis.pool.maxIdle}") private int maxIdle; @Value("${spring.redis.pool.minIdle}") private int minIdle; @Value("${spring.redis.blockWhenExhausted}") private Boolean blockWhenExhausted; @Value("${spring.redis.JmxEnabled}") private Boolean JmxEnabled; /** * 创建JedisPool实例 * * @return JedisPool */ @RefreshScope @Bean public JedisPool jedisPoolFactory() { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(maxTotal); jedisPoolConfig.setMaxIdle(maxIdle); jedisPoolConfig.setMinIdle(minIdle); jedisPoolConfig.setMaxWaitMillis(maxWait); // 连接耗尽时是否阻塞, false报异常,true阻塞直到超时, 默认true jedisPoolConfig.setBlockWhenExhausted(blockWhenExhausted); // 是否启用pool的jmx管理功能, 默认true jedisPoolConfig.setJmxEnabled(JmxEnabled); return new JedisPool(jedisPoolConfig, host, port, timeout, password); } /** * redisTemplate 序列化使用的jdkSerializeable, 存储二进制字节码, 所以自定义序列化类 * * @return redisTemplate */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); // 使用Jackson2JsonRedisSerialize 替换默认序列化 @SuppressWarnings({ "rawtypes", "unchecked" }) Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置value的序列化规则和 key的序列化规则 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); redisTemplate.afterPropertiesSet(); return redisTemplate; } }
@Slf4j @Component public class RedisUtil { @Resource private RedisTemplate<String, Object> redisTemplate; @Resource private StringRedisTemplate stringRedisTemplate; private static final String LOCK_SUCCESS = "OK"; /** * NX: 仅在键不存在时设置键 * XX: 只有在键已存在时才设置 */ private static final String SET_IF_NOT_EXIST = "NX"; /** * 过期时间单位 * EX: seconds * PX: milliseconds */ private static final String SET_WITH_EXPIRE_TIME = "EX"; private static final Long RELEASE_SUCCESS = 1L; /** * 尝试获取分布式锁 * * @param lockKey 分布式锁的key,想要获取锁时,判断这个key是否存在于redis中,存在则说明获取分布式锁失败,否则成功获取锁 * @param requestId 每个请求的全局唯一id,用于释放锁时只能释放自己持有的锁 * @param expireTime 超期时间,单位:秒 * @return boolean 是否获取成功 */ public boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { if (jedis == null) { return false; } String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { log.info("========================================获取分布式锁成功, lockKey is:{}, requestId is:{}", lockKey, requestId); return true; } log.info("========================================获取分布式锁失败, lockKey is:{}, requestId is:{}", lockKey, requestId); return false; } /** * 释放分布式锁 * * @param jedis Redis客户端 * @param lockKey 分布式锁的key * @param requestId 每个请求的全局唯一id * @return 是否释放成功 */ public boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { if (jedis == null) { log.info("========================================分布式锁释放失败,Jedis为空, lockKey is:{}, requestId is:{}", lockKey, requestId); return false; } // 通过Lua 脚本保证只释放requestId对应的lockKey String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { log.info("========================================分布式锁释放成功, lockKey is:{}, requestId is:{}", lockKey, requestId); return true; } log.info("========================================分布式锁释放失败, lockKey is:{}, requestId is:{}", lockKey, requestId); return false; } }
/** * 通过分布式锁来生成全局订单唯一的id * * @param type 订单类型 * @return String 订单唯一的id */ @Override public String generate(String type) { String orderId = null; log.info("========================================要生成的订单类型为:{}", type); if (StringUtils.isBlank(type)) { return null; } // 开始获取分布式锁 Jedis jedis = jedisPool.getResource(); String lockKey = redisUtils.getRedisKey(RedisTemplateConstant.DISTRIBUTED_LOCK_KEY_TYPE, type); String requestId = CommonUtil.getUUID(); try { if (redisUtils.tryGetDistributedLock(jedis, lockKey, requestId, expireTime)) { // 生成订单id orderId = getOrderId(type); } } catch (Exception e) { log.info("========================================组装id出错================================"); } finally { // 释放分布式锁 redisUtils.releaseDistributedLock(jedis, lockKey, requestId); if (jedis != null) { jedis.close(); } } return orderId; }
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.20.0</version>
</dependency>
@Slf4j @Component public class RedissonUtil { @Resource private Redisson redisson; /** * 加锁 * * @param key 分布式锁的 key * @param timeout 超时时间 * @param unit 时间单位 * @return */ public boolean tryLock(String key, long timeout, TimeUnit unit) { RLock lock = redisson.getLock(key); try { return lock.tryLock(timeout, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } } /** * 释放分布式锁 * * @param key 分布式锁的 key */ public void unlock(String key) { RLock lock = redisson.getLock(key); lock.unlock(); } }
<!-- 若使用redisTemplate作为分布式锁底层,则需要引入 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redis-template-spring-boot-starter</artifactId>
<version>2.2.4</version>
</dependency>
<!-- 若使用redisson作为分布式锁底层,则需要引入 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>lock4j-redisson-spring-boot-starter</artifactId>
<version>2.2.4</version>
</dependency>
@Slf4j @RestController @RequestMapping("/redisLock") public class RedisLockController { @Autowired private LockTemplate lockTemplate; /** * 使用 lock4j 注解加锁 */ @Lock4j(keys = {"#key"}, acquireTimeout = 1000, expire = 10000) @GetMapping("/testAnnotate") public R<String> testAnnotate(String key) { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } return R.ok(key); } /** * 使用LockTemplate模板加锁 */ @GetMapping("/testLock4jLockTemplate") public R<String> testLock4jLockTemplate(String key) { final LockInfo lockInfo = lockTemplate.lock(key, 30000L, 5000L, RedissonLockExecutor.class); if (null == lockInfo) { throw new RuntimeException("业务繁忙,请稍后再试!"); } // 获取锁成功,处理业务 try { try { Thread.sleep(8000); } catch (InterruptedException e) { // } System.out.println("当前线程:" + Thread.currentThread().getName()); } finally { //释放锁 lockTemplate.releaseLock(lockInfo); } return R.ok(key); } }
ZooKeeper实现分布式锁的原理基于其临时顺序节点的特性,而不只是临时节点。因为使用临时节点容易造成羊群效应,所谓羊群效应就是指当这个分布式锁被占用时,其他想要获取该分布式锁的客户端会监听这个锁,等待锁被释放后,ZooKeeper会通知监听了这锁的客户端去重新获取这个锁。如果等待获取该锁的客户端非常多,那么大家同时来竞争这个锁,会瞬间增大ZooKeeper的压力。在ZooKeeper中,每个节点都是一个天然的顺序号生成器。在创建临时顺序节点类型的子节点时,ZooKeeper会自动为其分配一个唯一的顺序号。具体处理流程如下:
(1)、当客户端想要获取分布式锁 lock1,会在 lock1 这个目录里面创建一个临时顺序节点,这个临时顺序节点的序号是ZooKeeper根据当下系统中的序号依次递增的。
(2)、然后客户端会判断自己创建的这个临时顺序节点的序号在 lock1 这个目录里面的序号是不是最小的?如果是最小的,那么就表示客户端获取锁成功。如果不是,则获取锁失败,并把自己的监听器注册到Zookeeper,用来监听它前面一个临时顺序节点。
(3)、后续想要获取该锁的客户端依次顺序的创建一个临时顺序节点,每个创建的节点的序号也是依次递增的。
(4)、当客户端处理完业务逻辑之后,它就会去释放锁。释放锁的操作就是去删除 lock1 这个目录下面当前客户端所创建的临时顺序节点。删除以后,Zookeeper 就会去通知监听这个顺序节点的客户端。接收到 Zookeeper 的通知之后,再按照步骤(2)中的逻辑来判断是否能够获取锁。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>latest</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>latest</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>latest</version>
</dependency>
spring:
zookeeper:
connect-string: localhost:8081
namespace: test
@Component public class ZkLockUtil { @Autowired private CuratorFramework curatorFramework; /** * 获取分布式锁 * * @param lockPath 锁路径 * @param waitTime 等待时间 * @param leaseTime 锁持有时间 * @param timeUnit 时间单位 * @return 锁对象 * @throws Exception 获取锁异常 */ public InterProcessMutex acquire(String lockPath, long waitTime, long leaseTime, TimeUnit timeUnit) throws Exception { InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath); if (!lock.acquire(waitTime, timeUnit)) { throw new RuntimeException("获取分布式锁失败"); } if (leaseTime > 0) { lock.acquire(leaseTime, timeUnit); } return lock; } /** * 释放分布式锁 * * @param lock 锁对象 * @throws Exception 释放锁异常 */ public void release(InterProcessMutex lock) throws Exception { if (lock != null) { lock.release(); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。