当前位置:   article > 正文

搭建Redis集群

搭建Redis集群

一 应用场景

为什么需要redis集群?

当主备复制场景,无法满足主机的单点故障时,需要引入集群配置。

一般数据库要处理的读请求远大于写请求 ,针对这种情况,我们优化数据库可以采用读写分离的策略。我们可以部 署一台主服务器主要用来处理写请求,部署多台从服务器 ,处理读请求。

二 基本原理

哨兵选举机制,如果有半数节点发现某个异常节点,共同决定改异常节点的状态,如果该节点是主节点,对应的备节点自动顶替为主节点。Sentinel(哨兵)是Redis 的高可用性解决方案:由一个或多个Sentinel 实例 组成的Sentinel 系统可以监视任意多个主服务器,以及这些主服务器属下的所有从服务器,并在被监视的主服务器进入下线状态时,自动将下线主服务器属下的某个从服务器升级为新的主服务器。

1.主从复制的作用

1、数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。

2、故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的冗余。

3、负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载;尤其是在写少读多的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。

4、读写分离:可以用于实现读写分离,主库写、从库读,读写分离不仅可以提高服务器的负载能力,同时可根据需求的变化,改变从库的数量。

5、高可用基石:除了上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础

2.配置集群所需的环境

Redis集群至少需要3个节点,因为投票容错机制要求超过半数节点认为某个节点挂了该节点才是挂了,所以2个节点无法构成集群。

要保证集群的高可用,需要每个节点都有从节点,也就是备份节点,所以Redis集群至少需要6台服务器。因为我没有那么多服务器,也启动不了那么多虚拟机,所在这里搭建的是伪分布式集群,即一台服务器虚拟运行6个redis实例,修改端口号为(7001-7006),当然实际生产环境的Redis集群搭建和这里是一样的。

槽点

在Redis集群中,槽(Slot)是一种逻辑上的划分方式,用于将数据按照Key进行分片存储。Redis集群默认将所有数据分成16384个槽,每个槽可以存储一个或多个键值对。

槽的作用是实现数据的分布式存储和负载均衡。通过对Key进行哈希计算,确定每个键值对应该存放在哪个槽中,从而将数据均匀地分配到整个集群中的各个节点上。这样可以保证数据在集群中的分布相对均衡,提高整个集群的性能和可扩展性。

在Redis集群中,每个节点负责处理一部分槽的数据。当一个节点加入或离开集群时,槽的重新分配会发生变化。集群中的主节点会根据需要将槽从一个节点迁移到另一个节点,以实现负载均衡和高可用性。

3.配置集群所需的环境

Redis集群至少需要3个节点,因为投票容错机制要求超过半数节点认为某个节点挂了该节点才是挂了,所以2个节点无法构成集群。

要保证集群的高可用,需要每个节点都有从节点,也就是备份节点,所以Redis集群至少需要6台服务器。因为我没有那么多服务器,也启动不了那么多虚拟机,所在这里搭建的是伪分布式集群,即一台服务器虚拟运行6个redis实例,修改端口号为(7001-7006),当然实际生产环境的Redis集群搭建和这里是一样的。

(1).规划网络。

用一台虚拟机模拟6个节点,一台机器6个节点,创建出3 master、3 salve 环境。虚拟机是 CentOS7 ,ip地址192.168.195.33

或者

创建六台服务器,步骤与一台创建六个节点效果相同

(2).创建 Redis 节点

首先在 192.168.195.33  机器上 /usr/目录下创建 myredis目录;

mkdir myredis

(创建六台服务器的克隆服务器安装配置好redis就可以,不用执行2、3)

(3).创建目录

在 myredis 目录下,创建名为7001、7002,7003、7004、7005,7006的目录

mkdir 7001 7002 7003 7004 7005 7006

 (4).配置7001、7002,7003、7004、7005,7006

先在7001目录配置,添加redis.conf文件

写入

  1. include /usr/th/redis/redis-5.0.14/redis.conf //th是我自己的目录
  2. port 7001
  3. pidfile "/var/run/redis_7001.pid"
  4. dbfilename "dump_7001.rdb"
  5. dir "/usr/myredis/7001"
  6. logfile "/usr/myredis/7001/redis_err_7001.log"
  7. bind 0.0.0.0
  8. protected-mode no
  9. daemonize yes
  10. cluster-enabled yes
  11. cluster-config-file nodes-7001.conf
  12. cluster-node-timeout 15000

(配置内容参考redis主从复制的文章)

机器一般不设置密码

(5).将 7001的redis.conf 拷贝到这六个目录中

echo ./7002 ./7003 ./7004 ./7005 ./7006 | xargs -n 1 cp -v /usr/myredis/7001/redis.conf

拷贝完然后更改里面的端口号可以了

(6)启动

一键启动,在myredis中写一个stredis的脚本,并给脚本赋权限

  1. #!/bin/bash
  2. /usr/local/bin/redis-server /usr/myredis/7001/redis.conf
  3. /usr/local/bin/redis-server /usr/myredis/7002/redis.conf
  4. /usr/local/bin/redis-server /usr/myredis/7003/redis.conf
  5. /usr/local/bin/redis-server /usr/myredis/7004/redis.conf
  6. /usr/local/bin/redis-server /usr/myredis/7005/redis.conf
  7. /usr/local/bin/redis-server /usr/myredis/7006/redis.conf

 (7) 创建redis的集群

Redis-cli --cluster create 节点(ip:端口号)

--cluster-replicas 1

分配主从 比例1

/usr/local/bin/redis-cli --cluster create 192.168.195.33:7001 192.168.195.33:7002 192.168.195.33:7003 192.168.195.33:7004 192.168.195.33:7005 192.168.195.33:7006 --cluster-replicas 1

六个节点  三个主节点 三个从节点配置成功如下图

(8)使用cli连接redis集群

使用客户端链接集群 必须使用 -c 连接

/usr/java/redis/bin/redis-cli -c -h 192.168.159.34 -p 7002

查看集群的节点的信息 :cluster nodes

 

(9)检查节点的信息 

 /usr/local/bin/redis-cli --cluster check 192.168.184.33:7003

(10)添加主节点

配置文件 7007 /redis.conf

在添加之前首先要像7001-6一样,修改配置文件

启动7007 服务 

添加主节点

/usr/java/redis/bin/redis-cli --cluster add-node 192.168.159.34:7007 192.168.159.34:7002

前面的IP加端口号是要添加的redis节点,后面的IP和端口号是集群中的任意一个节点。

 表示添加成功

(11) 配置从节点 

刚才添加的主节点还没有分配槽,所以无法使用

添加从节点之前需要设置从节点并启动节点

配置7008成为7007的从节点

 /usr/local/bin/redis-cli --cluster add-node 192.168.195.33:7008 192.168.195.33:7002 --cluster-slave --cluster-master-id 9b1df1481e5a40bf6028dcd036a823cca4f880f7

主节点分配槽 

槽在创建集群时就已分配只有那么多,所有只能重分配

 /usr/local/bin/redis-cli --cluster reshard 192.168.195.33:7001

查看7007,可以看到已分配500个槽

 (12)删除从节点

 --cluster del-node 集群的节点 节点的id

 /usr/local/bin/redis-cli --cluster del-node 192.168.195.33:7001 d1fcea6d16a4434493f6615e48c5bc9ba89364a0

(12)删除主节点

先将槽归还给集群

槽点为归0删不了

/usr/local/bin/redis-cli --cluster reshard 192.168.195.33:7001

槽点为0

删除主节点(只有槽点为0后才能删除主节点)

(13)测试集群 

关闭7002后7005成为主节点

 /usr/local/bin/redis-cli -p 7002 shutdown(优雅的关闭)

 

 如果7002重新启动成为7005的从节点

 优雅的关闭脚本 不能kill -9

 vim stpredis 

  1. #!/bin/bash
  2. /usr/local/bin/redis-cli -h 192.168.184.33 -p 7001 shutdown
  3. /usr/local/bin/redis-cli -h 192.168.184.33 -p 7002 shutdown
  4. /usr/local/bin/redis-cli -h 192.168.184.33 -p 7003 shutdown
  5. /usr/local/bin/redis-cli -h 192.168.184.33 -p 7004 shutdown
  6. /usr/local/bin/redis-cli -h 192.168.184.33 -p 7005 shutdown

三 slots

一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,

集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。

name 2 [0-5460]

集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:

节点 A 负责处理 0 号至 5460 号插槽。

节点 B 负责处理 5461 号至 10922 号插槽。

节点 C 负责处理 10923 号至 16383 号插槽。

1.在集群中录入值

在redis-cli每次录入、查询键值,redis都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis会报错,并告知应前往的redis实例地址和端口。

redis-cli客户端提供了 –c 参数实现自动重定向。

如 redis-cli -c –p 7000登入后,再录入、查询键值对可以自动重定向。

批量添加

不在一个slot下的键值,是不能使用mget,mset等多键操作。

可以通过{}来定义组的概念,从而使key中{}内相同内容的键值对放到一个slot中去。(按组分配插槽)

2.redis集群的优点

实现扩容

分摊压力

无中心配置相对简单

3.redis集群的不足

多键操作是不被支持的

多键的Redis事务是不被支持的

由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至redis cluster,需要整体迁移而不是逐步过渡,复杂度较大。

四 Redis的应用问题

1.缓存穿透

缓存穿透是指缓存和数据库中都没有的数据,导致所有的请求都落到数据库上,造成数据库短时间内承受大量请求而崩掉。

1.1问题描述

key对应的数据在数据库并不存在,每次针对此key的请求从缓存获取不到,请求都会压到数据库,从而可能压垮数据库。比如用一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

1.2解决方案

一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

解决方案:

(1) 对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟

(2) 设置可访问的名单(白名单):

使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。

(3) 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。

布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)

将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。

(4) 进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

2.缓存击穿

缓存击穿是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力。和缓存雪崩不同的是,缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到从而查数据库。

2.1问题描述

key对应的数据存在,但在redis中没有过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

2.2解决方案

key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

解决问题:

(1)预先设置热门数据:在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长

(2)实时调整:现场监控哪些数据热门,实时调整key的过期时长

(3)使用锁:

(1) 就是在缓存失效的时候(判断拿出来的值为空),不是立即去加载数据库。

(2) 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key

(3) 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;

(4) 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

setnx 分布式锁

3.缓存雪崩

缓存雪崩是指缓存同一时间大面积的失效,所以,后面的请求都会落到数据库上,造成数据库短时间内承受大量请求而崩掉。

3.1问题描述

key对应的数据存在,但在redis中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,击穿则是某一个key正常访问

缓存失效瞬间

3.2解决方案

缓存失效时的雪崩效应对底层系统的冲击非常可怕!

解决方案:

(1) 构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)

(2) 使用锁或队列:

5000 1000

用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

(3) 设置过期标志更新缓存:

记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。

(4) 将缓存失效时间分散开:

比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

五 分布式锁

问题描述

随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

分布式锁主流的实现方案:

1. 基于数据库实现分布式锁

2. 基于缓存(Redis等)

3. 基于Zookeeper

每一种分布式锁解决方案都有各自的优缺点:

1. 性能:redis最高

2. 可靠性:zookeeper最高

这里,我们就基于redis实现分布式锁。

解决方案:使用redis实现分布式锁

redis:命令

# set sku:1:info “OK” NX PX 10000

EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。

PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。

NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。

XX :只在键已经存在时,才对键进行设置操作。

setnx 键不存在的时候 设值

setnx lock test

业务逻辑

del lock

1. 多个客户端同时获取锁(setnx)

2. 获取成功,执行业务逻辑{从db获取数据,放入缓存},执行完成释放锁(del)

3. 其他客户端等待重试

问题:删除操作缺乏原子性。

场景:

1. index1执行删除时,查询到的lock值确实和uuid相等

uuid=v1

set(lock,uuid);

2. index1执行删除前,lock刚好过期时间已到,被redis自动释放

在redis中没有了lock,没有了锁。

3. index2获取了lock

index2线程获取到了cpu的资源,开始执行方法

uuid=v2

set(lock,uuid);

4. index1执行删除,此时会把index2的lock删除

index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行

删除的index2的锁!

LUA脚本保证删除的原子性

KEYS[1] 用来表示在redis 中用作键值的参数占位,主要用來传递在redis 中用作keyz值的参数。

ARGV[1] 用来表示在redis 中用作参数的占位,主要用来传递在redis中用做 value值的参数。

  1. @GetMapping("testLockLua")
  2. public void testLockLua() {
  3. //1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
  4. String uuid = UUID.randomUUID().toString();
  5. //2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
  6. String skuId = "25"; // 访问skuId 为25号的商品
  7. String locKey = "lock:" + skuId; // 锁住的是每个商品的数据
  8. // 3 获取锁
  9. Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
  10. // 第一种: lock 与过期时间中间不写任何的代码。
  11. // redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
  12. // 如果true
  13. if (lock) {
  14. // 执行的业务逻辑开始
  15. // 获取缓存中的num 数据
  16. Object value = redisTemplate.opsForValue().get("num");
  17. // 如果是空直接返回
  18. if (StringUtils.isEmpty(value)) {
  19. return;
  20. }
  21. // 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
  22. int num = Integer.parseInt(value + "");
  23. // 使num 每次+1 放入缓存
  24. redisTemplate.opsForValue().set("num", String.valueOf(++num));
  25. /*使用lua脚本来锁*/
  26. // 定义lua 脚本
  27. String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
  28. // 使用redis执行lua执行
  29. DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
  30. redisScript.setScriptText(script);
  31. // 设置一下返回值类型 为Long
  32. // 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
  33. // 那么返回字符串与0 会有发生错误。
  34. redisScript.setResultType(Long.class);
  35. // 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
  36. redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
  37. } else {
  38. // 其他线程等待
  39. try {
  40. // 睡眠
  41. Thread.sleep(1000);
  42. // 睡醒了之后,调用方法。
  43. testLockLua();
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end

keys: k2

val: 2

k2 1

lUA脚本

Lua 是一个小巧的脚本语言,Lua脚本可以很容易的被C/C++ 代码调用,也可以反过来调用C/C++的函数,Lua并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。

很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。

这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。

Lua 教程_w3cschool

Lua脚本基础入门及其案例_51CTO博客_lua脚本语言入门

LUA脚本在Redis中的优势

将复杂的或者多步的redis操作,写为一个脚本,一次提交给redis执行,减少反复连接redis的次数。提升性能。

LUA脚本是类似redis事务,有一定的原子性,不会被其他命令插队,可以完成一些redis事务性的操作。

但是注意redis的lua脚本功能,只有在Redis 2.6以上的版本才可以使用。

lua脚本示例

  1. redis.call('set',KEYS[1],ARGV[1])
  2. redis.call('set',KEYS[2],ARGV[2])
  3. local n1 = tonumber(redis.call('get',KEYS[1]))
  4. local n2 = tonumber(redis.call('get',KEYS[2]))
  5. if n1 > n2 then
  6. return 1
  7. end
  8. if n1 == n2 then
  9. return 0
  10. end
  11. if n1 < n2 then
  12. return 2
  13. end
  1. redis-cli --eval set.lua K1 K2 , 18 20
  2. 注意事项: key 和参数之间要用,隔开 并且,前后两端还有空格

lua脚本示例

  1. local userid=KEYS[1];
  2. local prodid=KEYS[2];
  3. local qtkey="sk:"..prodid..":qt";
  4. local usersKey="sk:"..prodid..":usr";
  5. local userExists=redis.call("sismember",usersKey,userid);
  6. if tonumber(userExists)==1 then
  7. return 2;
  8. end
  9. local num= redis.call("get" ,qtkey);
  10. if tonumber(num)<=0 then
  11. return 0;
  12. else
  13. redis.call("decr",qtkey);
  14. redis.call("sadd",usersKey,userid);
  15. end
  16. return 1;

出错

针对如上错误,作如下处理:

1)查看打开文件的上限和redis服务进程,修改上限

输入如下命令,查看其上限:

ulimit -a

设置上限

ulimit -n 10032

redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。简单说就是redis在分布式系统上工具的集合,Redission提供了分布式锁的多种多样的功能.

使用redissoncheck

自定义redis分布式锁无法自动续期,比如,一个锁设置了1分钟超时释放,如果拿到这个锁的线程在一分钟内没有执行完毕,那么这个锁就会被其他线程拿到,可能会导致严重的线上问题,在秒杀场景下,很容易因为这个缺陷导致的超卖了。

10 2s

1s

redission 超时时间1m 执行逻辑的时候3m

锁的分类:

1 、乐观锁与悲观锁

乐观锁

悲观锁

2 、可重⼊锁和⾮可重⼊锁

可重⼊锁:当在⼀个线程中第⼀次成功获取锁之后,在此线程中就可以再次获取

⾮可重⼊锁

3 、公平锁和⾮公平锁

公平锁:按照线程的先后顺序获取锁

⾮公平锁:多个线程随机获取锁

4 、阻塞锁和⾮阻塞锁

阻塞锁:不断尝试获取锁,直到获取到锁为⽌

⾮阻塞锁:如果获取不到锁就放弃,但可以⽀持在⼀定时间段内的重试

—— 在⼀段时间内如果没有获取到锁就放弃

Redission的使⽤

1 、获取锁 —— 公平锁和⾮公平锁

  1. // 获取公平锁
  2. RLock lock = redissonClient . getFairLock ( skuId );
  3. // 获取⾮公平锁
  4. RLock lock = redissonClient . getLock ( skuId );

2 、加锁 —— 阻塞锁和⾮阻塞锁

  1. // 阻塞锁(如果加锁成功之后,超时时间为 30s ;加锁成功开启看⻔狗,剩 5s 延⻓过期时间)
  2. lock . lock ();
  3. // 阻塞锁(如果加锁成功之后,设置⾃定义 20s 的超时时间)
  4. lock . lock ( 20 , TimeUnit . SECONDS );
  5. // ⾮阻塞锁(设置等待时间为 3s ;如果加锁成功默认超时间为 30s )
  6. boolean b = lock . tryLock ( 3 , TimeUnit . SECONDS );
  7. // ⾮阻塞锁(设置等待时间为 3s ;如果加锁成功设置⾃定义超时间为 20s )
  8. boolean b = lock . tryLock ( 3 , 20 , TimeUnit . SECONDS );

3 、释放锁

 lock . unlock ();

4 、应⽤示例

  1. // 公平⾮阻塞锁
  2. RLock lock = redissonClient . getFairLock ( skuId );
  3. boolean b = lock . tryLock ( 3 , 20 , TimeUnit . SECONDS );

Redisson 锁加锁流程:线程去获取锁,获取成功则执行lua脚本,保存数据到redis数据库。如果获取失败: 一直通过while循环尝试获取锁(可自定义等待时间,超时后返回失败)。Redisson提供的分布式锁是支持锁自动续期的,也就是说,如果线程仍旧没有执行完,那么redisson会自动给redis中的目标key延长超时时间,这在Redisson中称之为 Watch Dog 机制。

  1. @Autowired
  2. private RedissonClient redissonClient;
  3. public void method1() {
  4. RLock lock = redissonClient.getLock("lock");
  5. boolean isLock = lock.tryLock();
  6. if (!isLock) {
  7. log.error("获取锁失败,1");
  8. }
  9. try {
  10. log.info("获取锁成功,1");
  11. method2();
  12. }finally {
  13. log.info("释放锁,1");
  14. lock.unlock();
  15. }
  16. }
  17. public void method2() {
  18. RLock lock = redissonClient.getLock("lock");
  19. boolean isLock = lock.tryLock();
  20. if (!isLock) {
  21. log.error("获取锁失败,2");
  22. }
  23. try {
  24. log.info("获取锁成功,2");
  25. }finally {
  26. log.info("释放锁,2");
  27. lock.unlock();
  28. }
  29. }

使用redission

加jar

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. </dependency>
  5. <!--配置redission-->
  6. <dependency>
  7. <groupId>org.redisson</groupId>
  8. <artifactId>redisson-spring-boot-starter</artifactId>
  9. <version>3.12.5</version>
  10. </dependency>

application文件

  1. # 设置redis的信息
  2. spring.redis.host=192.168.174.72
  3. spring.redis.database=0
  4. spring.redis.password=yyl
  5. spring.redis.port=6379

配置类:

  1. package com.example.bootdemo.aaa.config;
  2. import org.redisson.Redisson;
  3. import org.redisson.api.RedissonClient;
  4. import org.redisson.config.Config;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. public class RedissonConfig {
  10. @Value("${spring.redis.host}")
  11. private String host;
  12. @Value("${spring.redis.port}")
  13. private String port;
  14. @Value("${spring.redis.password}")
  15. private String redisPassword;
  16. @Bean
  17. public RedissonClient getRedisson(){
  18. Config config = new Config();
  19. // //多节点config.useClusterServers()
  20. //单机模式 依次设置redis地址和密码
  21. config.useSingleServer().
  22. setAddress("redis://" + host + ":" + port).
  23. setPassword(redisPassword);
  24. return Redisson.create(config);
  25. }
  26. }

controller

  1. package com.example.bootdemo.aaa.controller;
  2. import org.redisson.api.RLock;
  3. import org.redisson.api.RedissonClient;
  4. import org.springframework.data.redis.core.StringRedisTemplate;
  5. import org.springframework.web.bind.annotation.PostMapping;
  6. import org.springframework.web.bind.annotation.RequestMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. import javax.annotation.Resource;
  9. import java.util.Objects;
  10. @RestController
  11. @RequestMapping("/redisLock")
  12. public class RedisLockController {
  13. @Resource
  14. private StringRedisTemplate stringRedisTemplate;
  15. @Resource
  16. private RedissonClient redisson;
  17. private static final String REDIS_KEY = "redis_test";
  18. private static final int MAX_SIZE = 10;
  19. /**
  20. * 初始化库存
  21. */
  22. @PostMapping("/init")
  23. public void init() {
  24. stringRedisTemplate.opsForValue().set(REDIS_KEY, String.valueOf(MAX_SIZE));
  25. }
  26. /**
  27. * 扣库存业务
  28. */
  29. @PostMapping("/test")
  30. public void exportInventory() {
  31. String lockKey = "product001";
  32. RLock lock = redisson.getLock(lockKey);
  33. try {
  34. lock.lock();
  35. int s = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(REDIS_KEY)));
  36. System.out.printf("1号服务:库存当前为:" + s + "\n");
  37. //stringRedisTemplate.opsForValue().set(REDIS_KEY, String.valueOf(s));
  38. if(s>0) {
  39. stringRedisTemplate.opsForValue().decrement(REDIS_KEY);
  40. }
  41. } catch (Exception e) {
  42. } finally {
  43. lock.unlock();
  44. }
  45. }
  46. }

使用jmeter测试:

写两个一模一样的项目 通过jmeter访问 redisLock/test

两个服务中的数据不重复即可

Redisson分布式锁入门使用(可重入锁(lock))_spring boot_人生没有第三次-华为云开发者联盟

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/article/detail/48222
推荐阅读
相关标签
  

闽ICP备14008679号