赞
踩
微服务项目中,很多资源需要互斥使用,比如一些分布式任务,比如下单的处理,退货的处理等等。这些都需要用到借助分布式锁来保证处理的唯一性。 一开始我们也手工实现了分布式锁,但是随着业务的发展,我们对锁的特性也要求越来越完善,最后选用了Redis官方推荐的Redisson。
Spring Boot使用Redisson特别简单,只要引入一个依赖就可以,redis的配置跟其他的redis客户端可以兼容,可以不用再额外配置
- <dependency>
- <groupId>org.redisson</groupId>
- <artifactId>redisson-spring-boot-starter</artifactId>
- <version>3.13.2</version>
- </dependency>
- # Redis服务器地址
- spring.redis.host=127.0.0.1
- # Redis服务器连接端口
- spring.redis.port=6379
RedisDistributedLockApplication
启动类使用锁RedissonClient
,并实现业务逻辑在ApplicationRunner#run()
方法。
- package com.erbadagang.springboot.redisdistributedlock;
-
- import lombok.extern.slf4j.Slf4j;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.boot.ApplicationArguments;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- import javax.annotation.Resource;
-
- @Slf4j
- @SpringBootApplication
- public class RedisDistributedLockApplication implements ApplicationRunner {
-
- public static void main(String[] args) {
- SpringApplication.run(RedisDistributedLockApplication.class, args);
- }
-
- /**
- * 直接注入RedissonClient就可以直接使用.
- */
- @Resource
- private RedissonClient redissonClient;
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- log.info("spring boot run");
-
- //创建锁
- RLock helloLock = redissonClient.getLock("hello");
-
- //加锁
- helloLock.lock();
- try {
- log.info("locked");
- Thread.sleep(1000 * 10);
- } finally {
- //释放锁
- helloLock.unlock();
- }
- log.info("finished");
- }
- }
启动Redis和RedisDistributedLockApplication,控制台输出:
- 2020-08-02 22:51:17.169 INFO 8876 --- [main] c.e.s.r.RedisDistributedLockApplication : spring boot run
- 2020-08-02 22:51:36.486 INFO 8876 --- [main] c.e.s.r.RedisDistributedLockApplication : locked
- 2020-08-02 22:51:46.493 INFO 8876 --- [main] c.e.s.r.RedisDistributedLockApplication : finished
- void lock();
- void lock(long leaseTime, TimeUnit unit);
- boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
- boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
void lock()
:第一表示lock表示去加锁,加锁成功,没有返回值,继续执行下面代码;但是如果redis已经有这个锁了,它会一直阻塞,直到锁的时间失效(默认30秒),再继续往下执行。这个方法是要保证一定要抢到锁的,它的默认过期时间也是30秒,和tryLock()不同的是,它如果没抢占到锁,会一直自旋。void lock(long leaseTime, TimeUnit unit)
:和第一无参数lock逻辑一样,只是可以直接设置锁失效时间。用法:helloLock.lock(5, TimeUnit.SECONDS);
。boolean tryLock(long time, TimeUnit unit)
表示尝试去加锁(第一个参数表示the maximum time to wait for the lock),加锁成功,返回true,继续执行true下面代码;但是如果redis已经有这个锁了他会等待,还拿不到锁它会返回false,执行false的代码块。为了实现waitTime,使用了redis的订阅发布功能。也就是没有抢到锁的线程订阅消息,直至waitTime过期返回false或者被通知新一轮的开始抢占锁。当然,它如果抢占到锁,锁的过期时间也是30秒,同样也会存在一个定时任务续过期时间,保证业务执行时间不会超过过期时间,抢占失败即返回false。
- String key ="product:001";
- RLock lock = redisson.getLock(key);
- try {
- boolean res = lock.tryLock(10,TimeUnit.SECONDS);
- if ( res){
- System.out.println("这里是你的业务代码");
- }else{
- System.out.println("系统繁忙");
- }
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- lock.unlock();
- }
如果把lock.unlock();
注释,第一次执行正常加锁,可以跑到业务逻辑代码,快速第二次执行发现他等待10秒,如果拿不到锁就走else的系统繁忙逻辑。
tryLock(long waitTime, long leaseTime, TimeUnit unit)
表示尝试去加锁(第一个参数表示等待时间,第二个参数表示key的失效时间),加锁成功,返回true,继续执行true下面代码;如果返回false,它会等待第一个参数设置的时间,然后去执行false下面的代码。个方法的参数leaseTime如果不是-1的话,是不会有定时任务续过期时间的,也就存在业务处理时间可能超过过期时间的风险。其他的和tryLock(long waitTime, TimeUnit unit)一致。
boolean res = lock.tryLock(5,3, TimeUnit.SECONDS);
这种情况,锁3秒失效,我们配置的是等待5秒,在单机刷的情况下,肯定每次都能拿到锁。
- /**
- * 异步锁
- */
- lock = redissonClient.getLock("erbadagang-lock");
- Future<Boolean> res = null;
- try {
- // lock.lockAsync();
- // lock.lockAsync(100, TimeUnit.SECONDS);
- res = lock.tryLockAsync(3, 100, TimeUnit.SECONDS);
- if (res.get()) {
- System.out.println("这里是你的Async业务代码");
- } else {
- System.out.println("系统繁忙Async");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (res.get()) {
- lock.unlock();
- }
- }
- log.info("finished");
- }
Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。在提供了自动过期解锁功能的同时,保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。
- /**
- * 公平锁测试。
- */
- @Test
- public void testFairLock() {
- RLock fairLock = redissonClient.getFairLock("anyLock");
- try {
- // 最常见的使用方法
- fairLock.lock();
- // 支持过期解锁功能, 10秒钟以后自动解锁,无需调用unlock方法手动解锁
- fairLock.lock(10, TimeUnit.SECONDS);
- // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
- boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
- if (res) {
- System.out.println("这里是你的业务代码");
- } else {
- System.out.println("系统繁忙");
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- fairLock.unlock();
- }
- }
Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:
- RLock fairLock = redisson.getFairLock("anyLock");
- fairLock.lockAsync();
- fairLock.lockAsync(10, TimeUnit.SECONDS);
- Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
Redis单节点配置:
- server:
- port: 8080
-
- spring:
- application:
- name: redis-distributed-lock
-
- ################ Redis ##############
- redis:
- host: 127.0.0.1
- port: 6379
- #password:
- timeout: 3000
- lettuce:
- pool:
- max-active: 8
- max-wait: -1
- max-idle: 8
- min-idle: 0
- redisson:
- config:
- # 单节点配置
- singleServerConfig:
- # 连接空闲超时,单位:毫秒
- idleConnectionTimeout: 10000
- pingTimeout: 1000
- # 连接超时,单位:毫秒
- connectTimeout: 10000
- # 命令等待超时,单位:毫秒
- timeout: 3000
- # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
- # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
- retryAttempts: 3
- # 命令重试发送时间间隔,单位:毫秒
- retryInterval: 1500
- # 重新连接时间间隔,单位:毫秒
- reconnectionTimeout: 3000
- # 执行失败最大次数
- failedAttempts: 3
- # 密码
- password: null
- # 单个连接最大订阅数量
- subscriptionsPerConnection: 5
- # 客户端名称
- clientName: null
- # 节点地址
- address: redis://127.0.0.1:6379
- # 发布和订阅连接的最小空闲连接数
- subscriptionConnectionMinimumIdleSize: 1
- # 发布和订阅连接池大小
- subscriptionConnectionPoolSize: 50
- # 最小空闲连接数
- connectionMinimumIdleSize: 32
- # 连接池大小
- connectionPoolSize: 64
- # 数据库编号
- database: 0
- # DNS监测时间间隔,单位:毫秒
- dnsMonitoringInterval: 5000
- # 线程池数量,默认值: 当前处理核数量 * 2
- threads: 0
- # Netty线程池数量,默认值: 当前处理核数量 * 2
- nettyThreads: 0
- # 编码
- #codec: !<org.redisson.codec.JsonJacksonCodec> {}
- # 传输模式
- transportMode: "NIO"
Redis集群配置:
- spring:
- redis:
- redisson:
- config:
- clusterServersConfig:
- idleConnectionTimeout: 10000
- connectTimeout: 10000
- timeout: 3000
- retryAttempts: 3
- retryInterval: 1500
- failedSlaveReconnectionInterval: 3000
- failedSlaveCheckInterval: 60000
- password: null
- subscriptionsPerConnection: 5
- clientName: null
- loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
- subscriptionConnectionMinimumIdleSize: 1
- subscriptionConnectionPoolSize: 50
- slaveConnectionMinimumIdleSize: 24
- slaveConnectionPoolSize: 64
- masterConnectionMinimumIdleSize: 24
- masterConnectionPoolSize: 64
- readMode: "SLAVE"
- subscriptionMode: "SLAVE"
- nodeAddresses:
- - "redis://192.168.35.142:7002"
- - "redis://192.168.35.142:7001"
- - "redis://192.168.35.142:7000"
- scanInterval: 1000
- pingConnectionInterval: 0
- keepAlive: false
- tcpNoDelay: false
- threads: 16
- nettyThreads: 32
- #codec: !<org.redisson.codec.FstCodec> {}
- transportMode: "NIO"
多线程测试:
- package com.erbadagang.springboot.redisdistributedlock;
-
- import lombok.extern.slf4j.Slf4j;
- import org.junit.jupiter.api.Test;
- import org.redisson.api.RLock;
- import org.redisson.api.RedissonClient;
- import org.springframework.boot.test.context.SpringBootTest;
-
- import javax.annotation.Resource;
- import java.util.concurrent.CountDownLatch;
-
- @SpringBootTest
- @Slf4j
- class RedisDistributedLockApplicationTests {
-
-
- /**
- * 有锁测试共享变量
- */
- private Integer lockCount = 10;
-
- /**
- * 无锁测试共享变量
- */
- private Integer count = 10;
-
- /**
- * 模拟线程数
- */
- private static int threadNum = 10;
-
- /**
- * 直接注入RedissonClient就可以直接使用.
- */
- @Resource
- private RedissonClient redissonClient;
-
- /**
- * 模拟并发测试加锁和不加锁2个方法。
- *
- * @return
- */
- @Test
- public void lock() {
- // 计数器
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- for (int i = 0; i < threadNum; i++) {
- MyRunnable myRunnable = new MyRunnable(countDownLatch);
- Thread myThread = new Thread(myRunnable);
- myThread.start();
- }
- // 释放所有线程
- countDownLatch.countDown();
- }
-
- /**
- * 加锁测试
- */
- private void testLockCount() {
- String lockKey = "lock-test";
- //创建锁
- RLock helloLock = redissonClient.getLock(lockKey);
-
- try {
- //加锁
- helloLock.lock();
- lockCount--;
- log.info("lockCount值:" + lockCount);
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- } finally {
- // 释放锁
- helloLock.unlock();
- }
- }
-
- /**
- * 无锁测试
- */
- private void testCount() {
- count--;
- log.info("count值:" + count);
- }
-
-
- public class MyRunnable implements Runnable {
- /**
- * 计数器
- */
- final CountDownLatch countDownLatch;
-
- public MyRunnable(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
-
- @Override
- public void run() {
- try {
- // 阻塞当前线程,直到计时器的值为0
- countDownLatch.await();
- } catch (InterruptedException e) {
- log.error(e.getMessage(), e);
- }
- // 无锁操作
- testCount();
- // 加锁操作
- testLockCount();
- }
-
- }
-
- }
控制台输出:
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-281] s.r.RedisDistributedLockApplicationTests : count值:3
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-283] s.r.RedisDistributedLockApplicationTests : count值:2
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-279] s.r.RedisDistributedLockApplicationTests : count值:5
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-282] s.r.RedisDistributedLockApplicationTests : count值:2
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-284] s.r.RedisDistributedLockApplicationTests : count值:2
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-278] s.r.RedisDistributedLockApplicationTests : count值:5
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-280] s.r.RedisDistributedLockApplicationTests : count值:4
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-275] s.r.RedisDistributedLockApplicationTests : count值:2
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-276] s.r.RedisDistributedLockApplicationTests : count值:2
- 2020-08-02 23:55:39.832 INFO 4144 --- [ Thread-277] s.r.RedisDistributedLockApplicationTests : count值:2
- 2020-08-02 23:55:39.848 INFO 4144 --- [ Thread-280] s.r.RedisDistributedLockApplicationTests : lockCount值:9
- 2020-08-02 23:55:39.848 INFO 4144 --- [ Thread-275] s.r.RedisDistributedLockApplicationTests : lockCount值:8
-
- 2020-08-02 23:55:39.883 INFO 4144 --- [ Thread-281] s.r.RedisDistributedLockApplicationTests : lockCount值:7
- 2020-08-02 23:55:39.885 INFO 4144 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
- 2020-08-02 23:55:39.885 INFO 4144 --- [ Thread-279] s.r.RedisDistributedLockApplicationTests : lockCount值:6
- 2020-08-02 23:55:39.885 INFO 4144 --- [ Thread-277] s.r.RedisDistributedLockApplicationTests : lockCount值:5
- 2020-08-02 23:55:39.885 INFO 4144 --- [ Thread-284] s.r.RedisDistributedLockApplicationTests : lockCount值:4
- 2020-08-02 23:55:39.903 INFO 4144 --- [ Thread-282] s.r.RedisDistributedLockApplicationTests : lockCount值:3
根据打印结果可以明显看到,未加锁的count--后值是乱序的,而加锁后的结果和我们预期的一样。由于条件问题没办法测试分布式的并发。只能模拟单服务的这种并发,但是原理是一样
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。