当前位置:   article > 正文

Redis面试题(2023最新版)

redis面试题

文章目录

1 Redis是什么?

  1. Redis是C语言开发的一个开源的高性能键值对(key-value)的内存数据库,可以用做数据库、缓存、消息中间件等。
  2. 它是一种NoSQL(not-only sql,泛指非关系型数据库)的数据库。
  3. 性能优秀,数据在内存中读写速度非常快,支持并发10W QPS。单进程单线程,是线程安全的,采用IO多路复用机制。丰富的数据类型,支持字符串(strings)、哈希(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)等
  4. 可以将内存中将数据保存在磁盘中,重启时加载。主从复制,哨兵,高可用。可以用作分布式锁, 可以作为消息中间件使用,支持发布订阅

2 Redis 支持哪几种数据类型?

Redis支持多种数据类型,其中比较常见的有五种:

  1. 字符串(String)
  2. 哈希(Hash)
  3. 列表(List)
  4. 集合(Set)
  5. 有序集合(Sorted Set)
    另外,Redis中还支持一些高级的数据类型,如:Streams、Bitmap、Geospatial以及HyperLogLog

先展示一个Redis数据类型和底层数据结构的关系图
在这里插入图片描述

String(字符串)[动态字符串]
String的底层是一个动态字符串。

List(列表类型)[双向链表、压缩表]

  • List对象的编码可以是ziplist或linkedlist,少量数据时使用ziplist。
  • 当List对象同时满足以下两个条件时,List对象采用ziplist编码:
    List对象保存的所有键值对的键和值的字符串长度均小于64字节
    List对象保存的键值数据小于512个

Hash(散列类型)[压缩列表、哈希表]

  • Hash类型对应的数据结构是两种:ziplist(压缩列表),hashtable(哈希表),少了数据时使用ziplist。
  • 当Hash对象同时满足以下两个条件时,Hash对象采用ziplist编码:
    Hash对象保存的所有键值对的键和值的字符串长度均小于64字节
    Hash对象保存的键值数据小于512个

Set(集合类型)[整数数组、哈希表]

  • Set(集合)的底层数据结构有两种实现方式,分别是哈希表(Hash Table)和整数数组(Intset)。选择哪种数据结构取决于集合中元素的特点。

  • 哈希表(Hash Table): 当集合中的元素是较为复杂的字符串类型,或者元素的数量较多时,Redis 会使用哈希表作为集合的底层数据结构。哈希表允许存储更多类型的数据,但在存储和内存使用上相对较大。

  • 整数数组(Intset): 当集合中的元素都是整数类型,且元素数量较少时,Redis 会使用整数数组作为集合的底层数据结构。整数数组可以在内存使用上更加紧凑,从而节省空间。

  • 需要注意的是,Redis 在判断是否使用整数数组还是哈希表时,会根据集合中元素的特点来做出选择。当集合中的元素全部为整数且符合一定条件(比如元素范围在一定区间内)时,会优先选择整数数组,否则会使用哈希表。

SortedSet(有序集合类型,简称zset)[压缩表、跳表]

  • Zset可以是ziplist或zkiplist,采用ziplist编码存储时,每个集合元素使用两个紧挨在一起的压缩列表来存储。当Zset对象满足以下两个条件时,采用ziplist编码:
    ①. Zset保存的元素个数小于128
    ②. Zset元素的成员长度都小于64字节。
  • 如果不满足以上条件任意一个,ziplist就会转化为zkiplist编码。

3.你讲一下压缩表和跳跃表吧?

压缩表
压缩列表实际上类似于一个数组,数组中的每个元素都对应保存一个数据。压缩列表采用一种紧凑的方式存储数据,可以有效地节省内存空间。和数组不同的是,压缩列表在表头有三个字段zlbytes、zltail和zllen,分别表示列表长度列表尾的偏移量列表中entry个数;压缩列表的尾部还有一个zlend,表示列表结束。在压缩列表中,如果要查找定位第一个元素和最后一个元素,可以通过表头三个字段的长度直接定位,时间复杂度是O(1)。而查找其他元素时,就没那么高效了,只能逐个查找,时间复杂是O(N)。
在这里插入图片描述

跳表:
有序链表只能逐一查找元素,导致操作起来非常缓慢,于是就出现了跳表。具体来说,跳表在链表的基础上,增加了多级索引,通过索引位置的几个跳转,实现数据的快速定位。
在这里插入图片描述

最后看一下各自时间复杂度:在这里插入图片描述

4. 整数数组和压缩列表在查找时间复杂度方面并没有很大的优势,那为什么 Redis 还会把它们作为底层数据结构呢?

  • 内存利用率,整数数组和压缩列表的设计,充分体现了 Redis“又快又省”特点中的“省”,也就是节省内存空间。整数数组和压缩列表都是在内存中分配一块地址连续的空间,然后把集合中的元素一个接一个地放在这块空间内,非常紧凑。因为元素是挨个连续放置的,我们不用再通过额外的指针把元素串接起来,这就避免了额外指针带来的空间开销
  • 充分利用CPU缓存,因为数组和压缩列表的内存是连续的,符合程序的局部性原理,就可以充分利用CPU高速缓存,速度会更快。
  • 另外,当数组元素超过阈值时,会自动转为hash和跳表,保证查询效率。

5 三种特殊类型使用过吗?

Redis三种特殊类型:bitmaps,hyperloglogs、地理空间(gepspatial)

6 谈一下Redis事务?

  1. Redis事务本质:一组命令的集合,一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行
  2. Redis事务没有隔离级别的概念,所有的命令在事务中,并没有直接被执行,只是发起执行命令(Exec)的时候才会执行。
  3. Redis单条命令式保持原子性,但是事务不保证原子性

redis的事务:

  1. 开启事务(multi)
  2. 命令入队(……)
  3. 执行事务(exec)

注意:
发生编译型异常(即命令错误),事务中所有命令都不会被执行。
发生运行时异常,其他命令可以正常执行,错误命令抛出异常。

7 redis是单线程为什么那么快?多线程不行吗?

  • 首先我们要明白,Redis是单线程,主要是指Redis的网络IO和键值对读写是由一个网络来完成的,这也是Redis对外提供键值存储服务的主要流程。但Redis其他功能,比如持久化、异步删除、集群同步等,都是由额外的线程执行的。
  • 假如我们使用的是多线程,那么多线程是能提高系统的吞吐量,但是多线程环境下资源是共享的,所以就会出现多线程编程模式面临的共享资源的并发访问控制问题,即使增加了线程,大部分线程也在等待获取访问共享资源的互斥锁,并行变串行,系统的吞吐量并没有随着线程的增加而增加。

Redis 之所以如此快,主要有以下几个方面的原因:

  1. 基于内存: Redis是一种基于内存的数据库,数据存储在内存中,数据的读写速度非常快,因为内存访问速度比硬盘访问速度快得多。
  2. 单线程模型: Redis使用单线程模型,这意味着它的所有操作都是在一个线程内完成的,不需要进行线程切换和上下文切换。这大大提高了Redis的运行效率和响应速度。
  3. 多路复用I/О模型:Redis 在单线程的基础上,采用了V/O多路复用技术,实现了单个线程同时处理多个客户端连接的能力,从而提高了Redis的并发性能。
  4. 高效的数据结构: Redis 提供了多种高效的数据结构,如哈希表、跳表,这是它能实现高性能的一个重要原因。

8 什么是IO多路复用机制?

  • Scoket非阻塞的实现方式采用了IO多路复用机制,那么在Linux 中的 IO 多路复用机制是指一个线程处理多个 IO 流
  • 下图就是基于多路复用的 Redis IO 模型。图中的多个 FD 就是套接字。Redis 网络框架调用 epoll 机制,让内核监听这些套接字。此时,Redis 线程不会阻塞在某一个特定的监听或已连接套接字上,也就是说,不会阻塞在某一个特定的客户端请求处理上。正因为此,Redis 可以同时和多个客户端连接并处理请求,从而提升并发性。
    在这里插入图片描述

Redis持久化

1 为什么Redis需要持久化?

redis是基于内存的,假设我们不做任何操作,只要redis重启(或者中途故障挂掉了),那么redis的数据就没掉了,那么我们也不想内存中的数据没掉吧,所以redis提供了持久化机制给我们使用,分别是AOF日志和RDB快照。

2 请你谈一下Redis的持久化机制?

Redis持久化机制一共有两种,分别是AOF日志RDB快照

  • 首先是AOF日志:AOF是把所有的Redis收到的所有写命令都记录到日志中,Redis重跑一遍这个日志记录下的日志就相当于还原了数据。
  • RDB快照根据我们自己配置时间或者是手动执行BGSAVE或SAVE命令,Redis就会去生成RDB文件

save:在主线程中执行,会导致阻塞;
bgsave:创建一个子进程,专门用于写入 RDB 文件,避免了主线程的阻塞,这也是 Redis RDB 文件生成的默认配置。

3 那么AOF日志 是如何把数据写回磁盘的呢?

  • AOF有三种写回策略,分别是Always同步写回Everysec每秒写回No手动写回
  • Redis是先将日志写入内存,然后通过写回策略异步写磁盘。
    在这里插入图片描述

4 那么AOF文件越写越多,文件过大,Redis是如何处理的?

这个时候Redis会使用重写机制

AOF重写过程是由后台子进程bgrewriteaof来完成的。总结为“一个拷贝,两处日志”。

  • “一个拷贝”就是指,每次执行重写时,主线程 fork 出后台的 bgrewriteaof 子进程。此时,fork 会把主线程的内存拷贝一份给 bgrewriteaof 子进程,这里面就包含了数据库的最新数据。然后,bgrewriteaof 子进程就可以在不影响主线程的情况下,逐一把拷贝的数据写成操作,记入重写日志。

  • “两处日志”又是什么呢?
    因为主线程未阻塞,仍然可以处理新来的操作。此时,如果有写操作,第一处日志就是指正在使用的 AOF 日志,Redis 会把这个操作写到它的缓冲区。这样一来,即使宕机了,这个 AOF 日志的操作仍然是齐全的,可以用于恢复。而第二处日志,就是指新的 AOF 重写日志,这个操作也会被写到重写日志的缓冲区。这样,重写日志也不会丢失最新的操作。等到拷贝数据的所有操作记录重写完成后,重写日志记录的这些最新操作也会写入新的 AOF 文件,以保证数据库最新状态的记录。此时,我们就可以用新的 AOF 文件替代旧文件了。
    在这里插入图片描述

5 那么你知道RDB快照执行原理吗?

RDB快照:
Redis 的数据都在内存中,为了提供所有数据的可靠性保证,它执行的是全量快照,也就是说,把内存中的所有数据都记录到磁盘中。

Redis 提供了两个命令来生成 RDB 文件,分别是 save 和 bgsave。
save:在主线程中执行,会导致阻塞;
bgsave:创建一个子进程,专门用于写入 RDB 文件,避免了主线程的阻塞,这也是 Redis RDB 文件生成的默认配置。

那么在执行bgsave时,在子线程去执行RDB文件,那么快照时的数据可修改吗?
这个时候主线程没有阻塞,是可以正常进行接收请求的,但是为了保证快照的完整性,他只能处理读操作,不能修改正在执行快照的数据。这显然不合理呀,为了实现主线程可以处理修改数据,所以Redis就会借助操作系统提供的写时复制技术(Copy-On-Write,COW),在执行快照的同时,正常处理写操作。

在这里插入图片描述
那么问题又来了?多久做一次快照合适呢?频繁做全量快照又会带来两方面性能的开销:

  • ①. 频繁将全量数据写入磁盘,会给磁盘带来很大压力,多个快照竞争有限的磁盘带宽,前一个快照还没做完,后一个又开始做了,容易造成恶性循环。
  • ②. 另一方面,bgsave子进程需要通过fork操作从主线程创建出来。显然,子进程在创建后不会再阻塞主线程,但是,fork这个创建过程本身会阻塞主线程,而且主线程的内存越大,阻塞时间越长。

所以我们在做了一次全量快照之后,后面增量快照,只对修改的数据做快照记录,这样避免了全量快照的开销。
但是这么做的前提是我们需要记录修改的信息,需要花费额外的空间去记录这些信息的修改,如果修改信息很大,额外空间的花销也很大,对于内存宝贵的Redis来说,有些得不偿失。

Redis4.0提出了一种混合使用AOF日志和内存快照的方法。简单来说,内存快照以一定的频率执行,在两次快照之间,使用AOF日志记录这期间的所有命令操作
在这里插入图片描述

6 关于RDB快照和AOF日志选择问题?

主要看业务场景吧,那么业务上如果允许重启时部分数据丢失的,那么开启RDB就够了,RDB启动时会比AOF快很多。但是如果是数据是有容忍度的,建议开启RDB和AOF一起用,官方建议也是都开启。

最后关于RDB快照和AOF日志选择问题:

  • ①.数据不能丢失,内存快照和AOF的混合使用是一个很好的选择。
  • ②.如果允许分钟级别的数据丢失,可以只使用RDB
  • ③.如果只用AOF,优先使用everysec的配置选项,因为它在可靠性和性能之间取一个平衡

集群

1 Redis的集群模式有哪些?

Redis有三种主要的集群模式,用于在分布式环境中实现高可用性和数据复制。这些集群模式分别是: 主从复制(Master-Slave Replication)、哨兵模式(Sentinel)和Redis Cluster模式

主从复制:

  • 主从复制是Redis最简单的集群模式。这个模式主要是为了解决单点故障的问题,所以将数据复制多个副本中,这样即使有一台服务器出现故障,其他服务器依然可以继续提供服务。
  • 主从模式中,包括一个主节点 (Master)和一个或多个从节点(Slave)。主节点负责处理所有写操作和读操作,而从节点则复制主节点的数据,并且只能处理读操作。当主节点发生故障时,可以将一个从节点升级为主节点,实现故障转移(需要手动实现)。
    在这里插入图片描述
  • 主从复制的优势在于简单易用,适用于读多写少的场景。它提供了数据备份功能,并且可以有很好的扩展性,只要增加更多的从节点,就能让整个集群的读的能力不断提升。
  • 但是主从模式最大的缺点,就是不具备故障自动转移的能力,没有办法做容错和恢复
  • 主节点和从节点的宕机都会导致客户端部分读写请求失败,需要人工介入让节点恢复或者手动切换一台从节点服务器变成主节点服务器才可以。并且在主节点宕机时,如果数据没有及时复制到从节点,也会导致数据不一致。

哨兵模式:

  • 为了解决主从模式的无法自动容错及恢复的问题,Redis引入了一种哨兵模式的集群架构。
  • 哨兵模式是在主从复制的基础上加入了哨兵节点。哨兵节点是一种特殊的Redis节点,用于监控主节点和从节点的状态。当主节点发生故障时,哨兵节点可以自动进行故障转移,选择一个合适的从节点升级为主节点,并通知其他从节点和应用程序进行更新。
  • 在原来的主从架构中,引入哨兵节点,其作用是监控Redis主节点和从节点的状态。每个Redis实例都可以作为哨兵节点,通常需要部署多个哨兵节点,以确保故障转移的可靠性。
  • 哨兵节点定期向所有主节点和从节点发送PING命令,如果在指定的时间内未收到PONG响应,哨兵节点会将该节点标记为主观下线。如果一个主节点被多数哨兵节点标记为主观下线,那么它将被标记为客观下线。
  • 当主节点被标记为客观下线时,哨兵节点会触发故障转移过程。它会从所有健康的从节点中选举一个新的主节点,并将所有从节点切换到新的主节点,实现自动故障转移。同时,哨兵节点会更新所有客户端的配置,指向新的主节点。
  • 哨兵节点通过发布订阅功能来通知客户端有关主节点状态变化的消息。客户端收到消息后,会更新配置,将新的主节点信息应用于连接池,从而使客户端可以继续与新的主节点进行交互。
  • 这个集群模式的优点就是为整个集群系统了一种故障转移和恢复的能力。

Redis Cluster模式:

  • Redis Cluster是Redis中推荐的分布式集群解决方案。它将数据自动分片到多个节点上,每个节点负责一部分数据。Redis Cluster采用无中心节点的架构,即所有节点都是平等的,没有单点故障。它支持动态扩展和收缩,可以自动处理节点的添加和删除。

在这里插入图片描述

  • Redis Cluster是适用于大规模应用的解决方案,它提供了更好的横向扩展和容错能力。它自动管理数据分片和故障转移,减少了运维的负担。
    Cluster模式的特点是数据分片存储在不同的节点上,每个节点都可以单独对外提供读写服务。不存在单点故障的问题。
  • Cluster模式的特点是数据分片存储在不同的节点上,每个节点都可以单独对外提供读写服务。不存在单点故障的问题。

2 那说一下主从复制的全量同步?

在这里插入图片描述
主从库第一同步流程:

  • ①.从库执行执行replicaof 172,16,19.3 6379
  • ②.从库主库建立连接,从库发送psync命令,表示进行数据同步,第一次同步所以从机不知道主机的runID,所以发送一个?,-1标识是offset表示第一次同步
  • ③.主机发送FULLRESYNC给从机,表示第一次全量复制,并把主库runID和和主库目前复制进度offset发送给从机
  • ④.主库会发送RDB文件给从库,从库收到RDB文件之后,会清空所有数据,然后在加载RDB文件
  • ⑤.在主从同步期间,主机不会被阻塞,仍然可以正常接收请求,接收到的请求会存在replication buffer中,等RDB文件发送完成后,主机会把replication buffer文件发送给从库。

3 那你说一下主从复制的增量同步?主从网络断了怎么办?

主从连接的网络断了,Redis的处理方式是使用主从的增量同步。有一个环形缓冲区repl_backing_buffer,当主从断开连接,主机会把断开连接期间的数据写入 replication buffer 和repl_backing_buffer中,在repl_backing_buffer主机会用偏移量master_repl_offerset记录自己写的位置,从机也会使用slave_repl_offerset记录自己读到的位置。
在这里插入图片描述
从机重新连接,从机会发送psync 我主机runID,偏移量offset给主机,主机会从repl_backing_buffer读取断开连接期间的数据。
在这里插入图片描述
思考一个问题?repl_backing_buffer是一个环形缓冲区,如果缓冲区写满之后,主库会继续写入,就会覆盖掉之前写入的操作。所以还需要一个参数来控制,repl_backing_size缓冲区大小,如果写入的数据超过缓冲区大小,那就会进行全量同步。

4 那么Redis在主从复制的和故障转移的过程中会导致数据丢失吗

  • 很显然是会的,从【主从复制】流程来看,这个过程是异步的(在复制的过程中:主服务器会一直接收请求,然后把命令发送给从服务器),假如主服务器的命令还没有发送完给从服务器,自己就挂了。这个时候要让从服务器顶替主服务器,但从服务器的数据是不全的。

  • 还有另外一种情况就是:有可能哨兵认为主服务器挂了,但是真实的主服务器其实并没有挂(网络抖动),而哨兵已经选取了一台服务器当做主服务器,此时客户端还没有反应过来,还继续向旧服务器写数据。等到旧服务器连接的时候,已经被纳入从服务器了,所以那段时间里,写的数据就丢失了。

5 哨兵模式说一下?

  • 哨兵模式。哨兵主干的事情主要就是:监控(监控主服务器的状态)、选主(主服务器挂了,在从服务器选出一个作为主服务器)、通知(故障发送消息给管理员)和配置(作为配置中心,提供当前主服务器的信息)。

  • 可以把「哨兵」当做是运行在「特殊」模式下的Redis服务器,为了「高可用」,哨兵也是集群架构的。首先它需要跟Redis主服务器创建对应连接(获取他们的信息)。

  • 每个哨兵都会不断的ping命令看主服务器有没有下线,如果主服务器在【配置时间】内没有正常响应,那么当前哨兵就主观的认为该服务器下线;其他哨兵同样也会ping该主服务器,如果足够多的哨兵认为该主服务器已经下线,那就认为【客观下线】,这时就需要对主服务器执行故障转移。

  • 哨兵之间也会挑选出来一个领头, 由领头对已下线的服务器进行故障转移。

  • 首先会在从服务器上挑选一个来作为主服务器,这里挑选是按照,比如:(从库配置优先级、要判断哪个服务器的复制进度【offset】最大、【RUNID】大小、跟master断开的连接时长等)。

  • 然后,以前的从服务器都需要跟新的主服务器进行主从复制。已经下线的主服务器,再次重新连接的时候,需要让他成为新的主服务器的从服务器。

6 Redis数据分片你知道吗?

  • Redis的数据分片(sharding)是一种将一个Redis数据集分割成多个部分分别存储在不同的Redis节点上的技术。它可以用于将一个单独的Redis数据库扩展到多个物理机器上,从而提高Redis集群的性能和可扩展性。
  • 在Redis的Cluster集群模式中,使用哈希槽(hash slot)的方式来进行数据分片,将整个数据集划分为多个槽,每个槽分配给一个节点。客户端访问数据时,先计算出数据对应的槽,然后直接连接到该槽所在的节点进行操作。Redis Cluster还提供了自动故障转移数据迁移扩缩容等功能,能够比较方便地管理一个大规模的Redis集群。
  • Redis Cluster将整个数据集划分为16384个槽,每个槽都有一个编号(0~16383),集群的每个节点可以负责多个hash槽,客户端访问数据时,先根据key计算出对应的槽编号,然后根据槽编号找到负责该槽的节点,向该节点发送请求。
  • 在Redis 的每一个节点上,都有这么两个东西,一个是槽(slot),它的的取值范围是:0-16383。还有一个就是cluster,可以理解为是一个集群管理的插件。当我们的存取的Key的时候,Redis 会根据CRC16算法得出一个结果,然后把结果对16384-求余数,这样每个key都会对应一个编号在0-16383之间的哈希槽,通过这个值,去找到对应的插槽所对应的节点,然后直接自动跳转到这个对应的节点上进行存取操作。

淘汰机制

1 如果假设你设置一批key只能存活1个小时,那么接下来1小时后,redis是怎么对这批key进行删除的?

  1. 定时删除
    在设置Key的过期时间的同时,为该key创建一个定时器让定时器在key的过期时间来临时,对key进行删除
    立即删除能够保证内存被尽快释放。但是这样话对cpu很不友好,因为删除操作会占用cpu时间,如果过期的key很多,删除这些key会占用很多的CPU时间,在CPU时间紧张的情况下,会影响数据的读写操作

  2. 惰性删除
    当数据到达过期时间时,先不做处理。等到下次访问该数据时,如果数据已经过期,再对数据进行删除
    这种删除策略问题也很明显,如果内存中有大量过期key。但是一直没人访问,那么数据就不会过期,内存也不会释放,会发生内存泄露。

  3. 定期删除
    定期删除是对CPU和内存消耗取得一个折中方案。
    每隔一段时间执行一次删除过期key操作。通过限制删除操作的时长频率,来减少删除操作对CPU时间的占用(处理“定时删除”的缺点),
    定期删除过期key(处理“惰性删除”的缺点)。
    难点:合理设置删除操作的执行时长和执行频率(要根据具体服务器运行情况来定)

2 redis使用的哪种过期策略?

redis默认是每隔100ms就随机抽取一些设置了过期时间的key,检查其是否过期,如果过期就删除。

上面说过定期删除可能会导致很多过期key到了时间并没有被删除掉。所以就用到了惰性删除了,也就是说,在你获取某个key的时候,redis会检查一下 ,这个key如果过期了此时就会删除,不会给你返回任何东西。

通过上述两种手段结合起来,保证过期的key一定会被干掉。

但是这样还是有问题的,如果定期删除漏掉了很多过期key,然后你也没及时去查,也就没走惰性删除,此时大量过期key堆积在内存里,导致redis内存快耗尽了,咋整?

下面就轮到redis的内存淘汰机制了。

3 redis内存淘汰策略

Redis 在使用的内存空间超过 maxmemory 值时,会根据淘汰策略对内存进行淘汰。
Redis 4.0 版本以后一共提供了 8 种数据淘汰策略
在这里插入图片描述

  • noevction: 不淘汰数据。
  • volatile-ttl 在筛选时,会针对设置了过期时间的键值对,根据过期时间的先后进行删除,越早过期的越先被删除。
  • volatile-random 就像它的名称一样,在设置了过期时间的键值对中,进行随机删除。
  • volatile-lru 会使用 LRU 算法筛选设置了过期时间的键值对。
  • volatile-lfu 会使用 LFU 算法选择设置了过期时间的键值对。
  • allkeys-random 策略,从所有键值对中随机选择并删除数据。
  • allkeys-lru 策略,使用 LRU 算法在所有数据中进行筛选。
  • allkeys-lfu 策略,使用 LFU 算法在所有数据中进行筛选。

4 那你你们平时使用那种内存淘汰策略呢?

  • 优先使用 allkeys-lru 策略。这样,可以充分利用 LRU 这一经典缓存算法的优势,把最近最常访问的数据留在缓存中,提升应用的访问性能。如果你的业务数据中有明显的冷热数据区分,我建议你使用 allkeys-lru 策略。
  • 如果业务应用中的数据访问频率相差不大,没有明显的冷热数据区分,建议使用 allkeys-random 策略,随机选择淘汰的数据就行。
  • 如果你的业务中有置顶的需求,比如置顶新闻、置顶视频,那么,可以使用 volatile-lru 策略,同时不给这些置顶数据设置过期时间。这样一来,这些需要置顶的数据一直不会被删除,而其他数据会在过期时根据 LRU 规则进行筛选。

5 redis的LRU了解过吗? 可否手写一个LRU算法

是什么?
LRU是Least Recently Used的缩写,即最近最少使用,是一种常用的页面置换算法, 选择最近最久未使用的数据予以淘汰。

撸代码:

  1. 依赖JDK (LinkedHashMap)
    写法一:
public class LRUCacheDemo<K, V> extends LinkedHashMap<K, V> {

    /**
     * 缓存坑位
     */
    private int capacity;

    public LRUCacheDemo(int capacity) {
        super(capacity, 0.75F, true);
        this.capacity = capacity;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        return super.size() > capacity;
    }

    public static void main(String[] args) {
        LRUCacheDemo lruCacheDemo = new LRUCacheDemo(3);

        lruCacheDemo.put(1, "a");
        lruCacheDemo.put(2, "b");
        lruCacheDemo.put(3, "c");
        System.out.println(lruCacheDemo.keySet());

        lruCacheDemo.put(4, "d");
        System.out.println(lruCacheDemo.keySet());

        lruCacheDemo.put(3, "c");
        System.out.println(lruCacheDemo.keySet());
        lruCacheDemo.put(3, "c");
        System.out.println(lruCacheDemo.keySet());
        lruCacheDemo.put(3, "c");
        System.out.println(lruCacheDemo.keySet());
        lruCacheDemo.put(5, "x");
        System.out.println(lruCacheDemo.keySet());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  1. 不依赖JDK
public class LRUCacheDemo {

    /**
     * map负责查找,构建一个虚拟的双向链表,它里面安装的就是一个个Node节点,作为数据载体。
     * 1.构造一个node节点作为数据载体
     */
    class Node<K, V> {
        K key;
        V value;
        Node<K, V> prev;
        Node<K, V> next;

        public Node() {
            this.prev = this.next = null;
        }

        public Node(K key, V value) {
            this.key = key;
            this.value = value;
            this.prev = this.next = null;
        }

    }

    /**
     * 2 构建一个虚拟的双向链表,,里面安放的就是我们的Node
     *
     * @param <K>
     * @param <V>
     */
    class DoubleLinkedList<K, V> {
        Node<K, V> head;
        Node<K, V> tail;

        public DoubleLinkedList() {
            head = new Node<>();
            tail = new Node<>();
            head.next = tail;
            tail.prev = head;
        }

        // 3. 添加到头
        public void addHead(Node<K, V> node) {
            node.next = head.next;
            node.prev = head;
            head.next.prev = node;
            head.next = node;
        }

        // 4.删除节点
        public void removeNode(Node<K, V> node) {
            node.next.prev = node.prev;
            node.prev.next = node.next;
            node.prev = null;
            node.next = null;
        }

        // 5.获得最后一个节点
        public Node getLast() {
            return tail.prev;
        }
    }

    private int cacheSize;
    Map<Integer, Node<Integer, Integer>> map;
    DoubleLinkedList<Integer, Integer> doubleLinkedList;

    public LRUCacheDemo(int cacheSize) {
        // 坑位
        this.cacheSize = cacheSize;
        // 查找
        map = new HashMap<>();
        doubleLinkedList = new DoubleLinkedList<>();
    }

    public int get(int key) {
        if (!map.containsKey(key)) {
            return -1;
        }

        Node<Integer, Integer> node = map.get(key);
        doubleLinkedList.removeNode(node);
        doubleLinkedList.addHead(node);

        return node.value;
    }

    public void put(int key, int value) {
        // update
        if (map.containsKey(key)) {
            Node<Integer, Integer> node = map.get(key);
            node.value = value;
            map.put(key, node);

            doubleLinkedList.removeNode(node);
            doubleLinkedList.addHead(node);
        } else {
            // 坑位满了
            if (map.size() == cacheSize) {
                Node<Integer, Integer> lastNode = doubleLinkedList.getLast();
                map.remove(lastNode.key);
                doubleLinkedList.removeNode(lastNode);
            }

            // 新增一个
            Node<Integer, Integer> newNode = new Node<>(key, value);
            map.put(key, newNode);
            doubleLinkedList.addHead(newNode);
        }
    }

    public static void main(String[] args) {

        LRUCacheDemo lruCacheDemo = new LRUCacheDemo(3);

        lruCacheDemo.put(1, 1);
        lruCacheDemo.put(2, 2);
        lruCacheDemo.put(3, 3);
        System.out.println(lruCacheDemo.map.keySet());

        lruCacheDemo.put(4, 1);
        System.out.println(lruCacheDemo.map.keySet());

        lruCacheDemo.put(3, 1);
        System.out.println(lruCacheDemo.map.keySet());
        lruCacheDemo.put(3, 1);
        System.out.println(lruCacheDemo.map.keySet());
        lruCacheDemo.put(3, 1);
        System.out.println(lruCacheDemo.map.keySet());
        lruCacheDemo.put(5, 1);
        System.out.println(lruCacheDemo.map.keySet());

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134

应用场景

1 请你讲一下缓存穿透、缓存击穿、缓存雪崩?

在这里插入图片描述

2 为什么Redis是AP,Zookeeper是CP?

CAP定理概念:Consistency(指数据的一致性)、Availability(可用性)、Partition tolerance(分区容错性)

  1. 一致性
    在分布式环境中,一致性是指数据在多个副本之间是否能够保持数据一致性的特性。在一致性的需求下,当一个系统在数据一致的状态下执行更新操作后,应该保证系统的数据仍然处于一致的状态。
  2. 可用性
    可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果
  3. 分区容错性
    分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障。
  1. redis单机:CP

我认为CAP是在分布式场景中的理论,如果是单机Redis,那就没有什么分布式可言,P都没有了,还谈什么AP、CP。

  1. redis集群:AP
    redis是高并发性,采用异步通知的方式,当主机宕机时会发现锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,master就挂了,从机上位但从机上无该数据。可从代码层面解决。

  2. zookeeper集群:CP
    zookeeper是高一致性,当所有zk服务器都收到消息后,整个过程才算完成。
    在这里插入图片描述

在这里插入图片描述

3 为什么要分布式锁?

在单机环境下,可以使用synchronized或Lock来实现。但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个jvm中),所以需要一个让所有进程都能访问到的锁来实现,比如redis或者zookeeper来构建;

4 知道分布式锁吗?有哪些实现方案? 你谈谈对redis分布式锁的理解?

  1. 使用setnx实现分布式锁
  2. 使用Redission实现分布式锁

为什么使用setnx可以实现分布式锁?是什么原理?
Redis是单线程的,在多个客户端同时通过SETNX命令尝试获取锁时,如果返回1表示获取锁成功,否则失败,并且每次只能有一个客户端获取到锁。

  1. 单Redis环境下:
    ① 方案一:使用setnx + Lua 脚本
@RestController
public class GoodController {

    public static final String REDIS_LOCK_KEY = "lockhhf";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${server.port}")
    private String serverPort;

    @GetMapping("/buy_goods")
    public String buy_Goods() throws Exception{

        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try{
            //setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性
            Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value,10L, TimeUnit.SECONDS)
            if (Boolean.FALSE.equals(lockFlag)) {
                return "抢锁失败,┭┮﹏┭┮";
            }else {
                String result = stringRedisTemplate.opsForValue().get("goods:001");
                int goodsNumber = result == null ? 0 : Integer.parseInt(result);

                if (goodsNumber > 0){
                    int realNumber = goodsNumber - 1;
                    stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                    System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort);
                    return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort;
                }else {
                    System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort);
                }
                return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort;
            }
        }finally {
            Jedis jedis = RedisUtils.getJedis();

            String script = "if redis.call('get', KEYS[1]) == ARGV[1]"+"then "
                    +"return redis.call('del', KEYS[1])"+"else "+ "  return 0 " + "end";
                try{
                    Object result = jedis.eval(script, Collections.singletonList(REDIS_LOCK_KEY), Collections.singletonList(value));
                    if ("1".equals(result.toString())){
                        System.out.println("------del REDIS_LOCK_KEY success");
                    }else {
                        System.out.println("------del REDIS_LOCK_KEY error");
                    }
                    }finally {
                        if (null != jedis){
                            jedis.close();
                        }
                    }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

② 方案二:使用setnx + Redis事务

@RestController
public class GoodController {

    public static final String REDIS_LOCK_KEY = "lockhhf";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${server.port}")
    private String serverPort;

    @GetMapping("/buy_goods")
    public String buy_Goods(){

        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        try{
            //setIfAbsent() == setnx 就是如果不存在就新建,同时加上过期时间保证原子性
            Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value,10L, TimeUnit.SECONDS);
            if (Boolean.FALSE.equals(lockFlag)) {
                return "抢锁失败,┭┮﹏┭┮";
            }else {
                String result = stringRedisTemplate.opsForValue().get("goods:001");
                int goodsNumber = result == null ? 0 : Integer.parseInt(result);
                if (goodsNumber > 0){
                    int realNumber = goodsNumber - 1;
                    stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                    System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort);
                    return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort;
                }else {
                    System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort);
                }
                return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort;
            }
        }finally {
            while (true)
            {
                stringRedisTemplate.watch(REDIS_LOCK_KEY); //加事务,乐观锁
                if (value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY))){
                    stringRedisTemplate.setEnableTransactionSupport(true);
                    stringRedisTemplate.multi();//开始事务
                    stringRedisTemplate.delete(REDIS_LOCK_KEY);
                    List<Object> list = stringRedisTemplate.exec();
                    if (list == null) {  //如果等于null,就是没有删掉,删除失败,再回去while循环那再重新执行删除
                        continue;
                    }
                }
                //如果删除成功,释放监控器,并且breank跳出当前循环
                stringRedisTemplate.unwatch();
                break;
            }
        } 
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

这两个方案在单Redis的情况下可以使用,但是企业很少只会用单个Redis的,一般都是Redis集群,所以在集群环境下会出现Redis异步复制造成的锁丢失,所以建议使用官方推荐解决方案RedLock。

  1. Redis集群环境下:
    可以使用官方解决方案RedLock(红锁)-Redisson
    Distributed locks with Redis – Redis

代码案例:

@RestController
public class GoodController {

    public static final String REDIS_LOCK_KEY = "lockhhf";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${server.port}")
    private String serverPort;

    @Autowired
    private Redisson redisson;

    @GetMapping("/buy_goods")
    public String buy_Goods(){

        String value = UUID.randomUUID().toString()+Thread.currentThread().getName();

        RLock redissonLock = redisson.getLock(REDIS_LOCK_KEY);
        redissonLock.lock();
        try{
            String result = stringRedisTemplate.opsForValue().get("goods:001");
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);

            if (goodsNumber > 0){
                int realNumber = goodsNumber - 1;
                stringRedisTemplate.opsForValue().set("goods:001",realNumber + "");
                System.out.println("你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort);
                return "你已经成功秒杀商品,此时还剩余:" + realNumber + "件"+"\t 服务器端口: "+serverPort;
            }else {
                System.out.println("商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort);
            }
            return "商品已经售罄/活动结束/调用超时,欢迎下次光临"+"\t 服务器端口: "+serverPort;

        }finally {
            //还在持有锁的状态,并且是当前线程持有的锁再解锁
            if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()){
                redissonLock.unlock();
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

Redission的watch dog机制是如何实现的?

5 redis默认内存多少?在哪里查看? 如何设置修改?

  • 如果不设置内存大小或者设置最大内存大小为0,在64位操作系统不限制内存大小,在32位系统下最多3GB。

  • 一般在生产上推荐Redis设置内存为最大物理内存的四分之三,也就是0.75

  • 修改redis内存设置:

① 通过修改文件配置
在这里插入图片描述
② 通过命令修改
在这里插入图片描述
③ 什么命令查看redis内存使用情况?
info memory
④ 真要打满了会怎么样? 如果Redis内存使用超出了设置的最大值会怎样?
改改配置,故意把最大值设为1个byte试试
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/661876
推荐阅读
相关标签
  

闽ICP备14008679号