赞
踩
高性能键值对
(key-value)的内存数据库,可以用做数据库、缓存、消息中间件
等。NoSQL
(not-only sql,泛指非关系型数据库)的数据库。数据在内存中
,读写速度非常快
,支持并发10W QPS。单进程单线程
,是线程安全的,采用IO多路复用机制
。丰富的数据类型,支持字符串(strings)、哈希(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)等
。主从复制,哨兵,高可用
。可以用作分布式锁
, 可以作为消息中间件
使用,支持发布订阅
。Redis支持多种数据类型,其中比较常见的有五种:
字符串
(String)哈希
(Hash)列表
(List)集合
(Set)有序集合
(Sorted Set)Streams、Bitmap、Geospatial以及HyperLogLog
先展示一个Redis数据类型和底层数据结构的关系图
① String(字符串)[动态字符串]
String的底层是一个动态字符串。
②List(列表类型)[双向链表、压缩表]
③ Hash(散列类型)[压缩列表、哈希表]
④ Set(集合类型)[整数数组、哈希表]
Set(集合)的底层数据结构有两种实现方式,分别是哈希表(Hash Table)和整数数组(Intset)。选择哪种数据结构取决于集合中元素的特点。
哈希表(Hash Table): 当集合中的元素是较为复杂的字符串类型,或者元素的数量较多时,Redis 会使用哈希表作为集合的底层数据结构。哈希表允许存储更多类型的数据,但在存储和内存使用上相对较大。
整数数组(Intset): 当集合中的元素都是整数类型,且元素数量较少时,Redis 会使用整数数组作为集合的底层数据结构。整数数组可以在内存使用上更加紧凑,从而节省空间。
需要注意的是,Redis 在判断是否使用整数数组还是哈希表时,会根据集合中元素的特点来做出选择。当集合中的元素全部为整数且符合一定条件(比如元素范围在一定区间内)时,会优先选择整数数组,否则会使用哈希表。
⑤ SortedSet(有序集合类型,简称zset)[压缩表、跳表]
压缩表:
压缩列表实际上类似于一个数组,数组中的每个元素都对应保存一个数据。压缩列表采用一种紧凑的方式存储
数据,可以有效地节省内存空间
。和数组不同的是,压缩列表在表头有三个字段zlbytes、zltail和zllen
,分别表示列表长度
、列表尾的偏移量
和列表中entry个数
;压缩列表的尾部还有一个zlend,表示列表结束。在压缩列表中,如果要查找定位第一个元素和最后一个元素,可以通过表头三个字段的长度直接定位,时间复杂度是O(1)。而查找其他元素时,就没那么高效了,只能逐个查找,时间复杂是O(N)。
跳表:
有序链表
只能逐一查找元素,导致操作起来非常缓慢,于是就出现了跳表。具体来说,跳表在链表的基础上,增加了多级索引
,通过索引位置的几个跳转,实现数据的快速定位。
最后看一下各自时间复杂度:
节省内存空间
。整数数组和压缩列表都是在内存中分配一块地址连续的空间
,然后把集合中的元素一个接一个地放在这块空间内,非常紧凑。因为元素是挨个连续放置的,我们不用再通过额外的指针把元素串接起来,这就避免了额外指针带来的空间开销。充分利用CPU缓存
,因为数组和压缩列表的内存是连续的,符合程序的局部性原理,就可以充分利用CPU高速缓存,速度会更快。hash和跳表
,保证查询效率。Redis三种特殊类型:bitmaps,hyperloglogs、地理空间(gepspatial)
会按照顺序执行
。Redis单条命令式保持原子性,但是事务不保证原子性
。redis的事务:
- 开启事务(multi)
- 命令入队(……)
- 执行事务(exec)
注意:
发生编译型异常(即命令错误),事务中所有命令都不会被执行。
发生运行时异常,其他命令可以正常执行,错误命令抛出异常。
- 首先我们要明白,Redis是单线程,主要是指
Redis的网络IO和键值对读写
是由一个网络来完成的,这也是Redis对外提供键值存储服务的主要流程。但Redis其他功能,比如持久化、异步删除、集群同步等,都是由额外的线程执行的。- 假如我们使用的是多线程,那么多线程是能提高系统的吞吐量,但是多线程环境下
资源是共享
的,所以就会出现多线程编程模式面临的共享资源的并发访问控制问题
,即使增加了线程,大部分线程也在等待获取访问共享资源的互斥锁,并行变串行,系统的吞吐量并没有随着线程的增加而增加。
Redis 之所以如此快,主要有以下几个方面的原因:
基于内存
: Redis是一种基于内存的数据库,数据存储在内存中,数据的读写速度非常快,因为内存访问速度比硬盘访问速度快得多。单线程模型
: Redis使用单线程模型,这意味着它的所有操作都是在一个线程内完成的,不需要进行线程切换和上下文切换。这大大提高了Redis的运行效率和响应速度。多路复用I/О模型
:Redis 在单线程的基础上,采用了V/O多路复用技术,实现了单个线程同时处理多个客户端连接的能力,从而提高了Redis的并发性能。高效的数据结构
: Redis 提供了多种高效的数据结构,如哈希表、跳表,这是它能实现高性能的一个重要原因。指一个线程处理多个 IO 流
。redis是基于内存
的,假设我们不做任何操作,只要redis重启(或者中途故障挂掉了),那么redis的数据就没掉了,那么我们也不想内存中的数据没掉吧,所以redis提供了持久化机制给我们使用,分别是AOF日志和RDB快照。
Redis持久化机制一共有两种,分别是AOF日志
和RDB快照
。
记录到日志中
,Redis重跑一遍这个日志记录下的日志就相当于还原了数据。Redis就会去生成RDB文件
。save:在主线程中执行,会导致阻塞;
bgsave:创建一个子进程,专门用于写入 RDB 文件,避免了主线程的阻塞,这也是 Redis RDB 文件生成的默认配置。
Always同步写回
,Everysec每秒写回
,No手动写回
;这个时候Redis会使用重写机制
。
AOF重写过程是由后台子进程bgrewriteaof来完成的。总结为“一个拷贝,两处日志”。
“一个拷贝”就是指,每次执行重写时,主线程 fork 出后台的 bgrewriteaof 子进程
。此时,fork 会把主线程的内存拷贝一份给 bgrewriteaof 子进程,这里面就包含了数据库的最新数据。然后,bgrewriteaof 子进程就可以在不影响主线程的情况下,逐一把拷贝的数据写成操作,记入重写日志。
“两处日志”又是什么呢?
因为主线程未阻塞,仍然可以处理新来的操作。此时,如果有写操作,第一处日志就是指正在使用的 AOF 日志,Redis 会把这个操作写到它的缓冲区
。这样一来,即使宕机了,这个 AOF 日志的操作仍然是齐全的,可以用于恢复。而第二处日志,就是指新的 AOF 重写日志,这个操作也会被写到重写日志的缓冲区
。这样,重写日志也不会丢失最新的操作。等到拷贝数据的所有操作记录重写完成后,重写日志记录的这些最新操作也会写入新的 AOF 文件,以保证数据库最新状态的记录。此时,我们就可以用新的 AOF 文件替代旧文件了。
RDB快照:
Redis 的数据都在内存中,为了提供所有数据的可靠性保证,它执行的是全量快照,也就是说,把内存中的所有数据都记录到磁盘中。
Redis 提供了两个命令来生成 RDB 文件,分别是 save 和 bgsave。
save:在主线程中执行,会导致阻塞;
bgsave:创建一个子进程,专门用于写入 RDB 文件,避免了主线程的阻塞,这也是 Redis RDB 文件生成的默认配置。
那么在执行bgsave时,在子线程去执行RDB文件,那么快照时的数据可修改吗?
这个时候主线程没有阻塞,是可以正常进行接收请求的,但是为了保证快照的完整性,他只能处理读操作,不能修改正在执行快照的数据。这显然不合理呀,为了实现主线程可以处理修改数据,所以Redis就会借助操作系统提供的写时复制技术
(Copy-On-Write,COW),在执行快照的同时,正常处理写操作。
那么问题又来了?多久做一次快照合适呢?频繁做全量快照又会带来两方面性能的开销:
所以我们在做了一次全量快照之后,后面增量快照,只对修改的数据做快照记录,这样避免了全量快照的开销。
但是这么做的前提是我们需要记录修改的信息,需要花费额外的空间去记录这些信息的修改,如果修改信息很大,额外空间的花销也很大,对于内存宝贵的Redis来说,有些得不偿失。
Redis4.0提出了一种混合使用AOF日志和内存快照的方法。简单来说,内存快照以一定的频率执行,在两次快照之间,使用AOF日志记录这期间的所有命令操作
。
主要看业务场景吧,那么业务上如果允许重启时部分数据丢失的,那么开启RDB就够了,RDB启动时会比AOF快很多。但是如果是数据是有容忍度的,建议开启RDB和AOF一起用,官方建议也是都开启。
最后关于RDB快照和AOF日志选择问题:
Redis有三种主要的集群模式,用于在分布式环境中实现高可用性和数据复制。这些集群模式分别是: 主从复制
(Master-Slave Replication)、哨兵模式
(Sentinel)和Redis Cluster模式
。
主从复制:
主节点负责处理所有写操作和读操作
,而从节点则复制主节点的数据,并且只能处理读操作。当主节点发生故障时,可以将一个从节点升级为主节点,实现故障转移(需要手动实现)。适用于读多写少的场景
。它提供了数据备份功能,并且可以有很好的扩展性,只要增加更多的从节点,就能让整个集群的读的能力不断提升。不具备故障自动转移的能力,没有办法做容错和恢复
。哨兵模式:
哨兵模式是在主从复制的基础上加入了哨兵节点
。哨兵节点是一种特殊的Redis节点,用于监控主节点和从节点的状态
。当主节点发生故障时,哨兵节点可以自动进行故障转移,选择一个合适的从节点升级为主节点,并通知其他从节点和应用程序进行更新。Redis Cluster模式:
主从库第一同步流程:
主从连接的网络断了,Redis的处理方式是使用主从的增量同步。有一个环形缓冲区repl_backing_buffer,当主从断开连接,主机会把断开连接期间的数据写入 replication buffer 和repl_backing_buffer中,在repl_backing_buffer主机会用偏移量master_repl_offerset记录自己写的位置,从机也会使用slave_repl_offerset记录自己读到的位置。
从机重新连接,从机会发送psync 我主机runID,偏移量offset给主机,主机会从repl_backing_buffer读取断开连接期间的数据。
思考一个问题?repl_backing_buffer是一个环形缓冲区,如果缓冲区写满之后,主库会继续写入,就会覆盖掉之前写入的操作。所以还需要一个参数来控制,repl_backing_size缓冲区大小,如果写入的数据超过缓冲区大小,那就会进行全量同步。
很显然是会的,从【主从复制】流程来看,这个过程是异步的(在复制的过程中:主服务器会一直接收请求,然后把命令发送给从服务器),假如主服务器的命令还没有发送完给从服务器,自己就挂了。这个时候要让从服务器顶替主服务器,但从服务器的数据是不全的。
还有另外一种情况就是:有可能哨兵认为主服务器挂了,但是真实的主服务器其实并没有挂(网络抖动),而哨兵已经选取了一台服务器当做主服务器,此时客户端还没有反应过来,还继续向旧服务器写数据。等到旧服务器连接的时候,已经被纳入从服务器了,所以那段时间里,写的数据就丢失了。
哨兵模式。哨兵主干的事情主要就是:监控(监控主服务器的状态)、选主(主服务器挂了,在从服务器选出一个作为主服务器)、通知(故障发送消息给管理员)和配置(作为配置中心,提供当前主服务器的信息)。
可以把「哨兵」当做是运行在「特殊」模式下的Redis服务器,为了「高可用」,哨兵也是集群架构的。首先它需要跟Redis主服务器创建对应连接(获取他们的信息)。
每个哨兵都会不断的ping命令看主服务器有没有下线,如果主服务器在【配置时间】内没有正常响应,那么当前哨兵就主观的认为该服务器下线;其他哨兵同样也会ping该主服务器,如果足够多的哨兵认为该主服务器已经下线,那就认为【客观下线】,这时就需要对主服务器执行故障转移。
哨兵之间也会挑选出来一个领头, 由领头对已下线的服务器进行故障转移。
首先会在从服务器上挑选一个来作为主服务器,这里挑选是按照,比如:(从库配置优先级、要判断哪个服务器的复制进度【offset】最大、【RUNID】大小、跟master断开的连接时长等)。
然后,以前的从服务器都需要跟新的主服务器进行主从复制。已经下线的主服务器,再次重新连接的时候,需要让他成为新的主服务器的从服务器。
Redis数据集分割成多个部分
,分别存储在不同的Redis节点上
的技术。它可以用于将一个单独的Redis数据库扩展到多个物理机器上,从而提高Redis集群的性能和可扩展性。哈希槽(hash slot)的方式来进行数据分片
,将整个数据集划分为多个槽,每个槽分配给一个节点。客户端访问数据时,先计算出数据对应的槽,然后直接连接到该槽所在的节点进行操作。Redis Cluster还提供了自动故障转移
、数据迁移
和扩缩容
等功能,能够比较方便地管理一个大规模的Redis集群。16384
个槽,每个槽都有一个编号(0~16383),集群的每个节点可以负责多个hash槽,客户端访问数据时,先根据key计算出对应的槽编号,然后根据槽编号找到负责该槽的节点,向该节点发送请求。定时删除
在设置Key的过期时间的同时,为该key创建一个定时器
,让定时器在key的过期时间来临时,对key进行删除
。
立即删除能够保证内存被尽快释放。但是这样话对cpu很不友好,因为删除操作会占用cpu时间,如果过期的key很多,删除这些key会占用很多的CPU时间,在CPU时间紧张的情况下,会影响数据的读写操作
惰性删除
当数据到达过期时间时,先不做处理。等到下次访问该数据时,如果数据已经过期,再对数据进行删除
。
这种删除策略问题也很明显,如果内存中有大量过期key。但是一直没人访问,那么数据就不会过期,内存也不会释放,会发生内存泄露。
定期删除
定期删除是对CPU和内存消耗取得一个折中方案。
每隔一段时间执行一次删除过期key操作
。通过限制删除操作的时长
和频率
,来减少删除操作对CPU时间的占用(处理“定时删除”的缺点),
定期删除过期key(处理“惰性删除”的缺点)。
难点:合理设置删除操作的执行时长和执行频率(要根据具体服务器运行情况来定)
redis默认是每隔100ms就随机抽取一些设置了过期时间的key,检查其是否过期,如果过期就删除。
上面说过定期删除可能会导致很多过期key到了时间并没有被删除掉。所以就用到了惰性删除了,也就是说,在你获取某个key的时候,redis会检查一下 ,这个key如果过期了此时就会删除,不会给你返回任何东西。
通过上述两种手段结合起来,保证过期的key一定会被干掉。
但是这样还是有问题的,如果定期删除漏掉了很多过期key,然后你也没及时去查,也就没走惰性删除,此时大量过期key堆积在内存里,导致redis内存快耗尽了,咋整?
下面就轮到redis的内存淘汰机制了。
Redis 在使用的内存空间超过 maxmemory 值
时,会根据淘汰策略对内存进行淘汰。
Redis 4.0 版本以后一共提供了 8 种数据淘汰策略
noevction
: 不淘汰数据。volatile-ttl
在筛选时,会针对设置了过期时间的键值对,根据过期时间的先后进行删除,越早过期的越先被删除。volatile-random
就像它的名称一样,在设置了过期时间的键值对中,进行随机删除。volatile-lru
会使用 LRU 算法筛选设置了过期时间的键值对。volatile-lfu
会使用 LFU 算法选择设置了过期时间的键值对。allkeys-random
策略,从所有键值对中随机选择并删除数据。allkeys-lru
策略,使用 LRU 算法在所有数据中进行筛选。allkeys-lfu
策略,使用 LFU 算法在所有数据中进行筛选。优先使用 allkeys-lru 策略
。这样,可以充分利用 LRU 这一经典缓存算法的优势,把最近最常访问的数据留在缓存中,提升应用的访问性能。如果你的业务数据中有明显的冷热数据区分,我建议你使用 allkeys-lru 策略。数据访问频率相差不大,没有明显的冷热数据区分,建议使用 allkeys-random 策略
,随机选择淘汰的数据就行。置顶的需求
,比如置顶新闻、置顶视频,那么,可以使用 volatile-lru 策略
,同时不给这些置顶数据设置过期时间。这样一来,这些需要置顶的数据一直不会被删除,而其他数据会在过期时根据 LRU 规则进行筛选。是什么?
LRU是Least Recently Used的缩写,即最近最少使用
,是一种常用的页面置换算法, 选择最近最久未使用的数据予以淘汰。
撸代码:
public class LRUCacheDemo<K, V> extends LinkedHashMap<K, V> { /** * 缓存坑位 */ private int capacity; public LRUCacheDemo(int capacity) { super(capacity, 0.75F, true); this.capacity = capacity; } @Override protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { return super.size() > capacity; } public static void main(String[] args) { LRUCacheDemo lruCacheDemo = new LRUCacheDemo(3); lruCacheDemo.put(1, "a"); lruCacheDemo.put(2, "b"); lruCacheDemo.put(3, "c"); System.out.println(lruCacheDemo.keySet()); lruCacheDemo.put(4, "d"); System.out.println(lruCacheDemo.keySet()); lruCacheDemo.put(3, "c"); System.out.println(lruCacheDemo.keySet()); lruCacheDemo.put(3, "c"); System.out.println(lruCacheDemo.keySet()); lruCacheDemo.put(3, "c"); System.out.println(lruCacheDemo.keySet()); lruCacheDemo.put(5, "x"); System.out.println(lruCacheDemo.keySet()); } }
public class LRUCacheDemo { /** * map负责查找,构建一个虚拟的双向链表,它里面安装的就是一个个Node节点,作为数据载体。 * 1.构造一个node节点作为数据载体 */ class Node<K, V> { K key; V value; Node<K, V> prev; Node<K, V> next; public Node() { this.prev = this.next = null; } public Node(K key, V value) { this.key = key; this.value = value; this.prev = this.next = null; } } /** * 2 构建一个虚拟的双向链表,,里面安放的就是我们的Node * * @param <K> * @param <V> */ class DoubleLinkedList<K, V> { Node<K, V> head; Node<K, V> tail; public DoubleLinkedList() { head = new Node<>(); tail = new Node<>(); head.next = tail; tail.prev = head; } // 3. 添加到头 public void addHead(Node<K, V> node) { node.next = head.next; node.prev = head; head.next.prev = node; head.next = node; } // 4.删除节点 public void removeNode(Node<K, V> node) { node.next.prev = node.prev; node.prev.next = node.next; node.prev = null; node.next = null; } // 5.获得最后一个节点 public Node getLast() { return tail.prev; } } private int cacheSize; Map<Integer, Node<Integer, Integer>> map; DoubleLinkedList<Integer, Integer> doubleLinkedList; public LRUCacheDemo(int cacheSize) { // 坑位 this.cacheSize = cacheSize; // 查找 map = new HashMap<>(); doubleLinkedList = new DoubleLinkedList<>(); } public int get(int key) { if (!map.containsKey(key)) { return -1; } Node<Integer, Integer> node = map.get(key); doubleLinkedList.removeNode(node); doubleLinkedList.addHead(node); return node.value; } public void put(int key, int value) { // update if (map.containsKey(key)) { Node<Integer, Integer> node = map.get(key); node.value = value; map.put(key, node); doubleLinkedList.removeNode(node); doubleLinkedList.addHead(node); } else { // 坑位满了 if (map.size() == cacheSize) { Node<Integer, Integer> lastNode = doubleLinkedList.getLast(); map.remove(lastNode.key); doubleLinkedList.removeNode(lastNode); } // 新增一个 Node<Integer, Integer> newNode = new Node<>(key, value); map.put(key, newNode); doubleLinkedList.addHead(newNode); } } public static void main(String[] args) { LRUCacheDemo lruCacheDemo = new LRUCacheDemo(3); lruCacheDemo.put(1, 1); lruCacheDemo.put(2, 2); lruCacheDemo.put(3, 3); System.out.println(lruCacheDemo.map.keySet()); lruCacheDemo.put(4, 1); System.out.println(lruCacheDemo.map.keySet()); lruCacheDemo.put(3, 1); System.out.println(lruCacheDemo.map.keySet()); lruCacheDemo.put(3, 1); System.out.println(lruCacheDemo.map.keySet()); lruCacheDemo.put(3, 1); System.out.println(lruCacheDemo.map.keySet()); lruCacheDemo.put(5, 1); System.out.println(lruCacheDemo.map.keySet()); } }
CAP定理概念:Consistency(指数据的一致性)、Availability(可用性)、Partition tolerance(分区容错性)
- 一致性
在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致性的特性
。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。- 可用性
可用性是指系统提供的服务必须一直处于可用的状态
,对于用户的每一个操作请求总是能够在有限的时间内返回结果
。- 分区容错性
分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务
,除非是整个网络环境都发生了故障。
我认为CAP是在分布式场景中的理论,如果是单机Redis,那就没有什么分布式可言,P都没有了,还谈什么AP、CP。
redis集群:AP
redis是高并发性,采用异步通知的方式
,当主机宕机时会发现锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,master就挂了,从机上位但从机上无该数据。可从代码层面解决。
zookeeper集群:CP
zookeeper是高一致性,当所有zk服务器都收到消息后,整个过程才算完成。
在单机环境下,可以使用synchronized或Lock来实现。但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现,比如redis或者zookeeper来构建;
为什么使用setnx可以实现分布式锁?是什么原理?
Redis是单线程的,在多个客户端同时通过SETNX命令尝试获取锁时,如果返回1表示获取锁成功,否则失败,并且每次只能有一个客户端获取到锁。
使用setnx + Lua 脚本
@RestController public class GoodController { public static final String REDIS_LOCK_KEY = "lockhhf"; @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String serverPort; @GetMapping("/buy_goods") public String buy_Goods() throws Exception{ String value = UUID.randomUUID().toString()+Thread.currentThread().getName(); try{ //setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性 Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value,10L, TimeUnit.SECONDS) if (Boolean.FALSE.equals(lockFlag)) { return "抢锁失败,┭┮﹏┭┮"; }else { String result = stringRedisTemplate.opsForValue().get("goods:001"); int goodsNumber = result == null ? 0 : Integer.parseInt(result); if (goodsNumber > 0){ int realNumber = goodsNumber - 1; stringRedisTemplate.opsForValue().set("goods:001",realNumber + ""); System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort); return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort; }else { System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort); } return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort; } }finally { Jedis jedis = RedisUtils.getJedis(); String script = "if redis.call('get', KEYS[1]) == ARGV[1]"+"then " +"return redis.call('del', KEYS[1])"+"else "+ " return 0 " + "end"; try{ Object result = jedis.eval(script, Collections.singletonList(REDIS_LOCK_KEY), Collections.singletonList(value)); if ("1".equals(result.toString())){ System.out.println("------del REDIS_LOCK_KEY success"); }else { System.out.println("------del REDIS_LOCK_KEY error"); } }finally { if (null != jedis){ jedis.close(); } } } } }
② 方案二:使用setnx + Redis事务
@RestController public class GoodController { public static final String REDIS_LOCK_KEY = "lockhhf"; @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String serverPort; @GetMapping("/buy_goods") public String buy_Goods(){ String value = UUID.randomUUID().toString()+Thread.currentThread().getName(); try{ //setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性 Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value,10L, TimeUnit.SECONDS); if (Boolean.FALSE.equals(lockFlag)) { return "抢锁失败,┭┮﹏┭┮"; }else { String result = stringRedisTemplate.opsForValue().get("goods:001"); int goodsNumber = result == null ? 0 : Integer.parseInt(result); if (goodsNumber > 0){ int realNumber = goodsNumber - 1; stringRedisTemplate.opsForValue().set("goods:001",realNumber + ""); System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort); return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort; }else { System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort); } return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort; } }finally { while (true) { stringRedisTemplate.watch(REDIS_LOCK_KEY); //加事务,乐观锁 if (value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY))){ stringRedisTemplate.setEnableTransactionSupport(true); stringRedisTemplate.multi();//开始事务 stringRedisTemplate.delete(REDIS_LOCK_KEY); List<Object> list = stringRedisTemplate.exec(); if (list == null) { //如果等于null,就是没有删掉,删除失败,再回去while循环那再重新执行删除 continue; } } //如果删除成功,释放监控器,并且breank跳出当前循环 stringRedisTemplate.unwatch(); break; } } } }
这两个方案在单Redis的情况下可以使用,但是企业很少只会用单个Redis的,一般都是Redis集群,所以在集群环境下会出现Redis异步复制造成的锁丢失,所以建议使用官方推荐解决方案RedLock。
RedLock(红锁)
-Redisson代码案例:
@RestController public class GoodController { public static final String REDIS_LOCK_KEY = "lockhhf"; @Autowired private StringRedisTemplate stringRedisTemplate; @Value("${server.port}") private String serverPort; @Autowired private Redisson redisson; @GetMapping("/buy_goods") public String buy_Goods(){ String value = UUID.randomUUID().toString()+Thread.currentThread().getName(); RLock redissonLock = redisson.getLock(REDIS_LOCK_KEY); redissonLock.lock(); try{ String result = stringRedisTemplate.opsForValue().get("goods:001"); int goodsNumber = result == null ? 0 : Integer.parseInt(result); if (goodsNumber > 0){ int realNumber = goodsNumber - 1; stringRedisTemplate.opsForValue().set("goods:001",realNumber + ""); System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort); return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort; }else { System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort); } return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort; }finally { //还在持有锁的状态,并且是当前线程持有的锁再解锁 if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()){ redissonLock.unlock(); } } } }
如果不设置内存大小或者设置最大内存大小为0,在64位操作系统不限制内存大小,在32位系统下最多3GB。
一般在生产上推荐Redis设置内存为最大物理内存的四分之三,也就是0.75
修改redis内存设置:
① 通过修改文件配置
② 通过命令修改
③ 什么命令查看redis内存使用情况?
info memory
④ 真要打满了会怎么样? 如果Redis内存使用超出了设置的最大值会怎样?
改改配置,故意把最大值设为1个byte试试
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。