赞
踩
基于内存存储的,NoSQL数据库(非关系型数据库),存储结构:key-value,Redis是一个开放源代码(BSD许可)内存中的数据结构存储用作数据库丶缓存和消息代理。对于数据量多,数据交互效率要求高的场景,可以考虑使用Redis。
Redis:开源、免费、高性能、K-V数据库、内存数据库、非关系型数据库,支持持久化、集群和事务。目前为止Redis支持的键值数据类型如下:
环境:Ubunto 16.04
输入命令 apt-get install redis-server
输入y 确认安装并使用空间
安装完成后,使用service redis status 可以查看redis服务的状态为active(running),说明安装完成系统自动启动了服务
使用ps -aux|grep redis命令可以看到服务器系统进程默认端口6379
环境:已安装GCC,Ubunto 16.04,redis-4.0.11.tar.gz,使用Xshell工具上传Redis的tar包文件
GNU编译器集合(GCC
)是C,C ++,Objective-C,Fortran,Ada,Go
和D编程语言的编译器和库的集合。许多开源项目包括GNU工具和Linux内核都是用GCC编译的。默认的Ubuntu
存储库包含一个名为build-essential
的元包,它包含GCC
编译器以及编译软件所需的许多库和其他实用程序。
首先更新包列表:apt update
安装build-essential软件包: apt install build-essential,该命令将安装一堆新包,包括gcc,g ++和make
使用gcc --version命令打印GCC版本:gcc --version
解压:tar -zxvf redis-4.0.11.tar.gz
复制:mv redis-4.0.11 /usr/local/redis
进入redis目录:cd /usr/local/redis/
编译,并使用标准的libc中的内存管理函数,必须进入Redis的安装目录才能执行该命令:make MALLOC=libc
安装,并选择安装目录:make PREFIX=/usr/local/redis install
查看安装目录下的文件,cd /usr/local/redis/bin/
开启Redis的服务端,./redis-server
开启Redis的客户端./redis-cli
docker pull redis
docker run -d -p 6379:6379 --name myredis
1、建议全部大写
2、key不能太长也不能太短,键名越长越占资源,太短可读性太差
3、key 单词与单词之间以:
分开,:
在redis中表示为命名空间
4、按照“业务类型:id:字段
”的方式进行命名
开发业务实践
1、在使用Redis进行数据缓存时,往往数据量是比较大的,若直接以普通键值对:key:value
存储,就会显得比较乱,数据分类不明显,不易于查看和查找数据
2、可以采取以命名空间开头的方式存储数据,使不同类型的数据统一放到一个命名空间下,方便查找
如何以命名空间分组呢?其实很简单,只用在存储数据时,键值对中的键命名以:
(冒号)分开即可。
命名空间 : key
。例如,user:username,user:sex。
如果使用了两个冒号,则会在命名空间下再创建一个无名称的“文件夹”,如下图:
redis
存储的是:key
,value
格式的数据,其中key
都是字符串,value
有5种不同的数据结构
value的数据结构:
字符串类型数据不能重复,若
对同一个key多次赋值,则会被覆盖
存储 : set key value
127.0.0.1:6379> set username zhangsan
OK
获取 : get key
127.0.0.1:6379> get username
"zhangsan"
删除 : del key
127.0.0.1:6379> del age
(integer) 1
哈希中存储的是map集合
存储 : hset key field value
127.0.0.1:6379> hset myhash username lisi
(integer) 1
127.0.0.1:6379> hset myhash password 123
(integer) 1
获取 : hget key field
: 获取指定的field
对应的值
127.0.0.1:6379> hget myhash username
"lisi"
hgetall key
: 获取所有的field和value
127.0.0.1:6379> hgetall myhash
1) "username"
2) "lisi"
3) "password"
4) "123"
删除 : hdel key field
127.0.0.1:6379> hdel myhash username
(integer) 1
列表类型相当于一个队列,可以添加一个元素到列表的头部(左边)或尾部(右边);列表中的元素允许重复
可以添加一个元素到列表的头部(左边)或者尾部(右边)
添加:
127.0.0.1:6379> lpush myList a
(integer) 1
127.0.0.1:6379> lpush myList b
(integer) 2
127.0.0.1:6379> rpush myList c
(integer) 3
获取:
lrange key start end
: 范围获取127.0.0.1:6379> lrange myList 0 -1
1) "b"
2) "a"
3) "c"
删除:
集合中不允许元素重复,也不保证顺序
存储 : sadd key value
127.0.0.1:6379> sadd myset a
(integer) 1
127.0.0.1:6379> sadd myset a
(integer) 0
获取 : smembers key
: 获取set集合中所有元素
127.0.0.1:6379> smembers myset
1) "a"
删除 : srem key value
: 删除set集合中的某个元素
127.0.0.1:6379> srem myset a
(integer) 1
不允许重复元素,且元素有顺序.每个元素都会关联一个
double
类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。
存储 : zadd key score value
127.0.0.1:6379> zadd mysort 60 zhangsan
(integer) 1
127.0.0.1:6379> zadd mysort 50 lisi
(integer) 1
127.0.0.1:6379> zadd mysort 80 wangwu
(integer) 1
获取 : zrange key start end [withscores]
127.0.0.1:6379> zrange mysort 0 -1
1) "lisi"
2) "zhangsan"
3) "wangwu"
127.0.0.1:6379> zrange mysort 0 -1 withscores
1) "zhangsan"
2) "60"
3) "wangwu"
4) "80"
5) "lisi"
6) "500"
删除 : zrem key value
127.0.0.1:6379> zrem mysort lisi
(integer) 1
redis
是一个内存数据库,当redis服务器重启,获取电脑重启,数据会丢失,我们可以将redis内存中的数据持久化保存到硬盘的文件中。
RDB
: 默认方式,不需要进行配置,默认就使用这种机制,在一定的间隔时间中,检测key
的变化情况,然后持久化数据
# after 900 sec (15 min) if at least 1 key changed
save 900 1
# after 300 sec (5 min) if at least 10 keys changed
save 300 10
# after 60 sec if at least 10000 keys changed
save 60 10000
./redis-server redis.conf
AOF
: 日志记录的方式,可以记录每一条命令的操作。可以每一次命令操作后,持久化数据
appendonly no(关闭aof) --> appendonly yes (开启aof)
# appendfsync always : 每一次操作都进行持久化
appendfsync everysec : 每隔一秒进行一次持久化
# appendfsync no : 不进行持久化
Jedis
: 一款java
操作redis
数据库的工具. 使用步骤:
//1. 获取连接
Jedis jedis = new Jedis("localhost",6379);
//2. 操作
jedis.set("username","zhangsan");
//3. 关闭连接
jedis.close();
注意哦:
redis.conf
,注释# bind 127.0.0.1
,并一配置文件启动,否则后台连接超时。String host="192.168.40.137";
int prot=6379;
Jedis jedis = new Jedis(host,prot);
jedis.auth("zysheep");
jedis.set("strName","李四");
System.out.println("strName的key:"+jedis.get("strName"));
System.out.println(jedis.ping()); //连接测试
string
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
//存储
jedis.set("username","zhangsan");
//获取
String username = jedis.get("username");
System.out.println(username);
//可以使用setex()方法存储可以指定过期时间的 key value
jedis.setex("activecode",20,"hehe");//将activecode:hehe键值对存入redis,并且20秒后自动删除该键值对
//3. 关闭连接
jedis.close();
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// 存储hash
jedis.hset("user","name","lisi");
jedis.hset("user","age","23");
jedis.hset("user","gender","female");
// 获取hash
String name = jedis.hget("user", "name");
System.out.println(name);
// 获取hash的所有map中的数据
Map<String, String> user = jedis.hgetAll("user");
// keyset
Set<String> keySet = user.keySet();
for (String key : keySet) {
//获取value
String value = user.get(key);
System.out.println(key + ":" + value);
}
//3. 关闭连接
jedis.close();
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// list 存储
jedis.lpush("mylist","a","b","c");//从左边存
jedis.rpush("mylist","a","b","c");//从右边存
// list 范围获取
List<String> mylist = jedis.lrange("mylist", 0, -1);
System.out.println(mylist);
// list 弹出
String element1 = jedis.lpop("mylist");//c
System.out.println(element1);
String element2 = jedis.rpop("mylist");//c
System.out.println(element2);
// list 范围获取
List<String> mylist2 = jedis.lrange("mylist", 0, -1);
System.out.println(mylist2);
//3. 关闭连接
jedis.close();
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// set 存储
jedis.sadd("myset","java","php","c++");
// set 获取
Set<String> myset = jedis.smembers("myset");
System.out.println(myset);
//3. 关闭连接
jedis.close();
//1. 获取连接
Jedis jedis = new Jedis();//如果使用空参构造,默认值 "localhost",6379端口
//2. 操作
// sortedset 存储
jedis.zadd("mysortedset",3,"亚瑟");
jedis.zadd("mysortedset",30,"后裔");
jedis.zadd("mysortedset",55,"孙悟空");
// sortedset 获取
Set<String> mysortedset = jedis.zrange("mysortedset", 0, -1);
System.out.println(mysortedset);
//3. 关闭连接
jedis.close();
使用:
//0.创建一个配置对象
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(50);
config.setMaxIdle(10);
//1.创建Jedis连接池对象
JedisPool jedisPool = new JedisPool(config,"localhost",6379);
//2.获取连接
Jedis jedis = jedisPool.getResource();
//3. 使用
jedis.set("hehe","heihei");
//4. 关闭 归还到连接池中
jedis.close();
jedis.properties
host=127.0.0.1 # redis服务器ip地址
port=6379 #端口
maxTotal=50 # 最大连接数
maxIdle=10 # 空闲连接数
public class JedisPoolUtils {
private static JedisPool jedisPool;
static{
//读取配置文件
InputStream is = JedisPoolUtils.class.getClassLoader().getResourceAsStream("jedis.properties");
//创建Properties对象
Properties pro = new Properties();
//关联文件
try {
pro.load(is);
} catch (IOException e) {
e.printStackTrace();
}
//获取数据,设置到JedisPoolConfig中
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(Integer.parseInt(pro.getProperty("maxTotal")));
config.setMaxIdle(Integer.parseInt(pro.getProperty("maxIdle")));
//初始化JedisPool
jedisPool = new JedisPool(config,pro.getProperty("host"),Integer.parseInt(pro.getProperty("port")));
}
/**
* 获取连接方法
*/
public static Jedis getJedis(){
return jedisPool.getResource();
}
}
redis事务提供了一种“将多个命令打包, 然后一次性、按顺序地执行”的机制, 并且事务在执行的期间不会主动中断 —— 服务器在执行完事务中的所有命令之后, 才会继续处理其他客户端的其他命令。
Redis中的事务是可以视为一个队列,即我们可以通过MULTI开始一个事务,这相当于我们声明了一个命令队列。
事务是一个原子操作,事物中的命令只有两种执行结果,即全部执行或者全部不执行。如果客户端在使用MULTI命令开启事务后因为意外而没有执行EXEC命令,则事务中的所有命令都不会执行。同理,如果客户端在使用MULTI命令开启事务后执行EXEC命令,则事务中的所有命令都会执行。
Redis中的事务可以使用DISCARD命令来清空一个命令队列,并放弃对事务的执行。如果命令在入队时发生错误,Redis将在客户端调用EXEC命令时拒绝执行并取消事务,但是在EXEC命令执行后发生的错误,Redis将选择自动忽略。
方式一
@SpringBootTest
class Springboot08RedisApplicationTests {
@Autowired
StringRedisTemplate stringRedisTemplate; //操作k-v都是字符串的
@Autowired
private RedisTemplate redisTemplate; //k-v都是对象的
/**
* redis事务
* multi :标记一个事务块的开始。
* exec : 执行所有事务块的命令
* discard : 取消事务,放弃执行事务块内的所有命令
* watch : Redis Watch 命令用于监视一个(或多个) key,如果在事务执行之前这个(或这些) key 被其他命令所改动,
* 那么事务将被打断
*/
@Test
public void redisTransaction1() {
// 开启事务支持,在同一个 Connection 中执行命令
redisTemplate.setEnableTransactionSupport(true);
redisTemplate.multi();
redisTemplate.opsForHash().put("map","username","封于修");
redisTemplate.opsForHash().put("map","age",20);
redisTemplate.opsForHash().put("map","sex","男");
redisTemplate.opsForHash().put("map","weight",80);
System.out.println(redisTemplate.exec());
}
}
方式二: 推荐使用
/**
* 执行事务方式二
*/
@Test
public void redisTransaction2(){
redisTemplate.execute(new SessionCallback<List<Object>>(){
@Override
public List<Object> execute(RedisOperations operations) throws DataAccessException {
operations.multi();
operations.opsForValue().set("name::1","巽风震雷刀");
operations.opsForValue().set("name::2","风雷步");
operations.opsForValue().set("name::3","夺魂:阔");
return redisTemplate.exec();
}
});
}
setnx:redis提供的分布式锁
Boolean bool = redisTemplate.opsForValue().setIfAbsent("lock","uuid");
存在问题:线程还没释放锁系统宕机了,造成死锁
setnx +setex:给锁设置过期时间,到期自动删除。
Boolean bool = redisTemplate.opsForValue().setIfAbsent("lock","uuid");
redisTemplate.expire("lock",60, TimeUnit.SECONDS);
存在问题:因为加锁和过期时间设置非原子,存在设置超时时间失败情况,导致死锁
set(key,value,nx,px):将setnx+setex变成原子操作
Boolean bool = redisTemplate.opsForValue().setIfAbsent("lock","uuid",60,TimeUnit.SECONDS);
存在问题:加锁和释放锁不是同一个线程的问题。假如线程1业务还没执行完,锁过期释放,线程2获取锁执行,线程1执行完业务删除锁删除的就是线程2的,然后其他线程又可获取锁执行,线程2执行完释放锁删除的是别人的,如此往复,导致并发安全问题。
方法1:在value中存入uuid(线程唯一标识),删除锁时判断该标识,
String uuid = UUID.randomUUID().toString().replace("-","");
String lockKey = "lock";
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid);
// 加锁失败
if (!aBoolean) {
return "加锁失败";
}
if (redisTemplate.opsForValue().get(lockKey).equals(uuid)){
//加锁,删除
redisTemplate.delete(lockKey);
}
同时删除锁需保证原子性,否则还是有删除别人锁问题,可通过lua或者redis事务释放锁
String uuid = UUID.randomUUID().toString().replace("-","");
String lockKey = "lock";
try {
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid);
// 加锁失败
if (!aBoolean) {
return "加锁失败";
}
// TODO 业务逻辑
} finally {
/**使用lua脚本加锁,保证原子性**/
// 定义lua脚本
String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
"then " +
"return redis.call('del',KEYS[1]) " +
"else " +
" return 0 " +
"end";
// 使用reids执行lua脚本
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回0给其封装为数据类型。如果不封装那么默认为String类型
redisScript.setResultType(Long.class);
/**
* 第一个参数: lua脚本
* 第二个参数: 需要判断的key
* 第三个参数: key所对应的值
*/
redisTemplate.execute(redisScript, Arrays.asList(lockKey), uuid);
}
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。
其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。
Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
引入redisson-spring-boot-starter
,不过这里需要注意springboot与redisson的版本,因为官方推荐redisson版本与springboot版本配合使用。版本不匹配会导致问题。注意springboot最低版本不要低于1.3.x即可。
如: Spring Boot: 2.0.9.RELEASE
,使用redisson-spring-data-20
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
<exclusions>
<exclusion>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-23</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-data-20</artifactId>
<version>3.13.6</version>
</dependency>
方法2:利用redis提供的第三方类库,Redisson也可解决任务超时,锁自动释放问题。其通过开启另一个服务,后台进程定时检查持有锁的线程是否继续持有锁了,是将锁的生命周期重置到指定时间,即防止线程释放锁之前过期,所以将锁声明周期通过重置延长。Redission也可解决不可重入问题(AQS,计数)
1、加锁
/**
* 加锁,设置有效期并指定时间单位
* @param leaseTime 有效时间
* @param unit 时间单位
*
*/
void lock(); // 阻塞式等待。不设置过期时间时,默认30s
1、锁的自动续期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期被删掉
2、加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除
void lock(long leaseTime, TimeUnit unit);
1、指定过期时间、看门狗生效不会自动续期。所以指定过期时间一定要大于业务的执行时间
2、尝试获取锁
/**
* 尝试获取锁,获取到则持有该锁leaseTime时间.
* 若未获取到,在waitTime时间内一直尝试获取,超过watiTime还未获取到则返回false
* @param waitTime 尝试获取时间
* @param leaseTime 锁持有时间
* @param unit 时间单位
* @return true-获取锁成功 false-获取锁失败
*/
boolean tryLock(); // 不设置过期时间时,redisson默认30s
boolean tryLock(long time, TimeUnit unit)
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)
3、释放锁
/**
* 释放锁
*/
void unlock();
4、判断是否持有锁
/**
* 检查锁是否被任何线程锁定,如果锁定则返回:true,否则返回false
**/
boolean isLocked();
/**
* 检查当前线程是否持有此锁。如果当前线程持有则返回:true,否则返回false
**/
boolean isHeldByCurrentThread();
Redisson指定和不指定超时时间的主要区别是,加锁成功之后的逻辑不一样,不指定超时时间时,会开启watchdog后台线程,不断的续约加锁时间,而指定超时时间,就不会去开启watchdog定时任务,这样就不会续约,加锁key到了过期时间就会自动删除,也就达到了释放锁的目的。
在实际项目中,指不指定锁的超时时间是根据具体的业务来的,如果你能够比较准确的预估出代码执行的时间,那么可以指定锁超时释放时间来防止业务执行错误导致无法释放锁的问题,如果不能预估出代码执行的时间,那么可以不指定超时时间。
Redisson 锁的种类有以下几种:可重入锁,公平锁,联锁,红锁,读写锁
一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法,而无需重新获得锁。
public void method1() {
RLock lock = redissonClient.getLock("lock");
try {
lock.lock();
method2();
} finally {
lock.unlock();
}
System.out.println("释放锁成功");
}
public void method2() {
RLock lock = redissonClient.getLock("lock");
try {
if (lock.tryLock()) {
System.out.println("加锁成功");
//业务逻辑
}
} finally {
lock.unlock();
}
}
它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
在锁争夺较多时,程序利用大量cas去获取锁非常消耗CPU,可以考虑设置为公平锁。在锁的抢夺较少的时候就没必要设置成公平锁,毕竟公平锁也是需要成本的。
RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。
联锁指的是:同时对多个资源进行加锁操作,只有所有资源都加锁成功的时候,联锁才会成功。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
//业务逻辑
lock.unlock();
基于Redis的Redisson红锁RedissonRedLock对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
与联锁比较相似,都是对多个资源进行加锁,但是红锁与连锁不同的是,红锁只需要在大部分资源加锁成功即可,大部分是指n/2+1
。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。 这里有3个锁,至少2个加锁成功,才算真的加锁成功
lock.lock();
...
lock.unlock();
读写锁:允许多个线程同时读,但是只允许一个线程写,在线程获取到写锁的时候,其他写操作和读操作都会处于阻塞状态,读锁和写锁也是互斥的,所以在读的时候是不允许写的。
/**
* 保证一定能读到最新数据,修改期间,写锁是一个排他锁(互斥锁)。读锁是一个共享锁
* 写锁没释放,读就必须等待
*/
@GetMapping("/read")
@ResponseBody
public String readValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.readLock();
try {
rLock.lock();
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "";
}
@GetMapping("/write")
@ResponseBody
public String writeValue() {
RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock");
RLock rLock = readWriteLock.writeLock();
try {
//1、改数据加写锁,读数据加读锁
rLock.lock();
TimeUnit.SECONDS.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "";
}
@Data
@ConfigurationProperties(prefix = "session.redis")
public class RedissonProperties {
private String host = "127.0.0.1";
private String port = "6379";
private String password = "";
private Integer timeout = 1000;
}
@Configuration
@EnableConfigurationProperties(RedissonProperties.class)
public class RedissonConfig {
@Autowired
private RedissonProperties redissonProperties;
/**
* Redisson客户端注册
* 单机模式
*/
@Bean(value = "redissonClient", destroyMethod = "shutdown")
public RedissonClient createRedissonClient() {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://" + redissonProperties.getHost() + ":" + redissonProperties.getPort());
singleServerConfig.setTimeout(redissonProperties.getTimeout());
//密码不为空才设置,否则默认yml注入的空串会导致创建redisson不成功
if(!StringUtils.isBlank(redissonProperties.getPassword())){
singleServerConfig.setPassword(redissonProperties.getPassword());
}
return Redisson.create(config);
}
}
@RestController
@Slf4j
public class RedisController {
@Autowired
private RedissonClient redissonClient;
static AtomicInteger threadNum = new AtomicInteger(1);
static final ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
2,
4,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(50),
r -> {
Thread thread = new Thread(r);
thread.setName("thread-pool-" + threadNum.getAndIncrement());
return thread;
},
new ThreadPoolExecutor.AbortPolicy());
@GetMapping("/testLock")
public String testLock(String key) {
log.info("线程池中正在执行任务的线程数量: {}", threadPoolExecutor.getActiveCount());
log.info("线程池已完成的任务数量: {}", threadPoolExecutor.getCompletedTaskCount());
log.info("线程池的核心线程数量: {}", threadPoolExecutor.getCorePoolSize());
log.info("线程池曾经创建过的最大线程数量: {}", threadPoolExecutor.getLargestPoolSize());
log.info("线程池的最大线程数量: {}", threadPoolExecutor.getMaximumPoolSize());
log.info("线程池当前的线程数量: {}", threadPoolExecutor.getPoolSize());
log.info("线程池已经执行的和未执行的任务总数: {}", threadPoolExecutor.getTaskCount());
log.info("线程池中正在执行任务的线程数量: {}", threadPoolExecutor.getActiveCount());
RLock lock = redissonClient.getLock(key);
// 如果锁定为True,否则为false
if (!lock.isLocked()) {
threadPoolExecutor.execute(new SendBankTask(lock, key));
}
}
// 定时任务模拟并发调用 5秒执行一次
@Scheduled(cron = "0/5 * * * * MON-SAT")
public void invoker() {
RestTemplate restTemplate = new RestTemplate();
//String uuid = UUID.randomUUID().toString().replace("-", "");
if (new Random().nextInt() %2==0) {
String key = "ffs_mdtrt_merge_d#SendBank#2022#430500";
String url = "http://localhost:8080/testLock?key={1}";
restTemplate.getForObject(url, String.class, key);
} else {
String key = "ffs_mdtrt_merge_d#SendBank#2022#430990";
String url = "http://localhost:8080/testLock?key={1}";
restTemplate.getForObject(url, String.class, key);
}
}
}
@Slf4j
class SendBankTask implements Runnable {
private String key;
private RLock lock;
public SendBankTask(RLock lock, String key) {
this.lock = lock;
this.key = key;
}
@Override
public void run() {
boolean lockFlag = lock.tryLock();
long id = Thread.currentThread().getId();
log.info("{}:{}", key, lockFlag);
if (lockFlag) {
try {
log.info(id + "=====执行业务代码 start=====");
TimeUnit.SECONDS.sleep(20);
log.info(id + "=====执行业务代码 end =====");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock.isHeldByCurrentThread()){ // 时候是当前执行线程的锁
lock.unlock(); // 释放锁
log.info("释放分布式锁成功key:{}", key);
}
}
}
}
}
问题:但上述方案能保证单机系统下的并发访问安全,实际为了保证redis高可用,redis一般会集群部署。单机解决方案会出现锁丢失问题。如线程set值后成功获取锁但主节点还没来得及同步就宕机了,从节点选举成为主节点,没有锁信息,此时其他线程就可以加锁成功,导致并发问题。
引入redis
的starter
依赖
<!--redis start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--redis end-->
<!--cache start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<!--cache end-->
<!--mysql-connector start-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!--mysql-connector end-->
<!--mybatis start-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.2</version>
</dependency>
<!--mybatis end-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
配置redis
连接
spring:
datasource:
url: jdbc:mysql://172.16.0.192:3306/springboot_cache
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
redis:
host: 172.16.0.192 # 主机地址
cache:
redis:
time-to-live: -1 #毫秒
#开启驼峰命名
mybatis:
configuration:
map-underscore-to-camel-case: true
logging:
level:
cn:
panyucbale:
springboot:
mapper: debug
测试redisTemplate
常用api
@SpringBootTest
class Springboot08RedisApplicationTests {
@Autowired
StringRedisTemplate stringRedisTemplate; //操作k-v都是字符串的
@Autowired
private RedisTemplate redisTemplate; //k-v都是对象的
/**
* Redis常见的五大数据类型
* String(字符串)、List(列表)、Set(集合)、Hash(散列)、ZSet(有序集合)
* stringRedisTemplate.opsForValue()[String(字符串)]
* stringRedisTemplate.opsForList()[List(列表)]
* stringRedisTemplate.opsForSet()[Set(集合)]
* stringRedisTemplate.opsForHash()[Hash(散列)]
* stringRedisTemplate.opsForZSet()[ZSet(有序集合)]
*/
@Test
public void test01() {
//给redis中保存数据
//stringRedisTemplate.opsForValue().set("string_msg","hello");
String msg = stringRedisTemplate.opsForValue().get("string_msg");
System.out.println(msg);
// stringRedisTemplate.opsForList().leftPush("mylist","1");
// stringRedisTemplate.opsForList().leftPush("mylist","2");
}
}
@SuppressWarnings(value = { "unchecked", "rawtypes" })
@Component
public class RedisService
{
@Autowired
public RedisTemplate redisTemplate;
/**
* 缓存基本的对象,Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
*/
public <T> void setCacheObject(final String key, final T value)
{
redisTemplate.opsForValue().set(key, value);
}
/**
* 缓存基本的对象,Integer、String、实体类等
*
* @param key 缓存的键值
* @param value 缓存的值
* @param timeout 时间
* @param timeUnit 时间颗粒度
*/
public <T> void setCacheObject(final String key, final T value, final Long timeout, final TimeUnit timeUnit)
{
redisTemplate.opsForValue().set(key, value, timeout, timeUnit);
}
/**
* 设置有效时间
*
* @param key Redis键
* @param timeout 超时时间
* @return true=设置成功;false=设置失败
*/
public boolean expire(final String key, final long timeout)
{
return expire(key, timeout, TimeUnit.SECONDS);
}
/**
* 设置有效时间
*
* @param key Redis键
* @param timeout 超时时间
* @param unit 时间单位
* @return true=设置成功;false=设置失败
*/
public boolean expire(final String key, final long timeout, final TimeUnit unit)
{
return redisTemplate.expire(key, timeout, unit);
}
/**
* 获取有效时间
*
* @param key Redis键
* @return 有效时间
*/
public long getExpire(final String key)
{
return redisTemplate.getExpire(key);
}
/**
* 判断 key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public Boolean hasKey(String key)
{
return redisTemplate.hasKey(key);
}
/**
* 获得缓存的基本对象。
*
* @param key 缓存键值
* @return 缓存键值对应的数据
*/
public <T> T getCacheObject(final String key)
{
ValueOperations<String, T> operation = redisTemplate.opsForValue();
return operation.get(key);
}
/**
* 删除单个对象
*
* @param key
*/
public boolean deleteObject(final String key)
{
return redisTemplate.delete(key);
}
/**
* 删除集合对象
*
* @param collection 多个对象
* @return
*/
public boolean deleteObject(final Collection collection)
{
return redisTemplate.delete(collection) > 0;
}
/**
* 缓存List数据
*
* @param key 缓存的键值
* @param dataList 待缓存的List数据
* @return 缓存的对象
*/
public <T> long setCacheList(final String key, final List<T> dataList)
{
Long count = redisTemplate.opsForList().rightPushAll(key, dataList);
return count == null ? 0 : count;
}
/**
* 获得缓存的list对象
*
* @param key 缓存的键值
* @return 缓存键值对应的数据
*/
public <T> List<T> getCacheList(final String key)
{
return redisTemplate.opsForList().range(key, 0, -1);
}
/**
* 缓存Set
*
* @param key 缓存键值
* @param dataSet 缓存的数据
* @return 缓存数据的对象
*/
public <T> BoundSetOperations<String, T> setCacheSet(final String key, final Set<T> dataSet)
{
BoundSetOperations<String, T> setOperation = redisTemplate.boundSetOps(key);
Iterator<T> it = dataSet.iterator();
while (it.hasNext())
{
setOperation.add(it.next());
}
return setOperation;
}
/**
* 获得缓存的set
*
* @param key
* @return
*/
public <T> Set<T> getCacheSet(final String key)
{
return redisTemplate.opsForSet().members(key);
}
/**
* 缓存Map
*
* @param key
* @param dataMap
*/
public <T> void setCacheMap(final String key, final Map<String, T> dataMap)
{
if (dataMap != null) {
redisTemplate.opsForHash().putAll(key, dataMap);
}
}
/**
* 获得缓存的Map
*
* @param key
* @return
*/
public <T> Map<String, T> getCacheMap(final String key)
{
return redisTemplate.opsForHash().entries(key);
}
/**
* 往Hash中存入数据
*
* @param key Redis键
* @param hKey Hash键
* @param value 值
*/
public <T> void setCacheMapValue(final String key, final String hKey, final T value)
{
redisTemplate.opsForHash().put(key, hKey, value);
}
/**
* 获取Hash中的数据
*
* @param key Redis键
* @param hKey Hash键
* @return Hash中的对象
*/
public <T> T getCacheMapValue(final String key, final String hKey)
{
HashOperations<String, String, T> opsForHash = redisTemplate.opsForHash();
return opsForHash.get(key, hKey);
}
/**
* 获取多个Hash中的数据
*
* @param key Redis键
* @param hKeys Hash键集合
* @return Hash对象集合
*/
public <T> List<T> getMultiCacheMapValue(final String key, final Collection<Object> hKeys)
{
return redisTemplate.opsForHash().multiGet(key, hKeys);
}
/**
* 删除Hash中的某条数据
*
* @param key Redis键
* @param hKey Hash键
* @return 是否成功
*/
public boolean deleteCacheMapValue(final String key, final String hKey)
{
return redisTemplate.opsForHash().delete(key, hKey) > 0;
}
/**
* 获得缓存的基本对象列表
*
* @param pattern 字符串前缀
* @return 对象列表
*/
public Collection<String> keys(final String pattern)
{
return redisTemplate.keys(pattern);
}
}
opsForValue().set()
// 存入, 无过期时间
redisTemplate.opsForValue().set("key1","value1");
//存入, 有过期时间,过期时间到后,自动删除
redisTemplate.opsForValue().set("key1","value1",60,TimeUnit.SECONDS);
opsForValue().setIfAbsent()
: 分布式锁常用
//当前key不存在,写入值, 并返回true; 当前key已经存在,不处理, 返回false; Absent: 缺少的,
Boolean bool = redisTemplate.opsForValue().setIfAbsent("key1","value1",60,TimeUnit.SECONDS);
opsForValue().setIfPresent()
//当前key已经存在,写入值, 并返回true; 当前key不存在,不处理, 返回false; ;Present: 存在的
Boolean bool1 = redisTemplate.opsForValue().setIfPresent("key1","value1",60,TimeUnit.SECONDS);
opsForValue().getAndSet()
//获取原来key的value, 再将新的value写入
String dataStr1 = redisTemplate.opsForValue().getAndSet("key1", "value2");
opsForValue().multiSet()
//批量设置
Map<String, String> map = new HashMap<>();
map.put("key1", "value1");
map.put("key2", "value2");
redisTemplate.opsForValue().multiSet(map);
opsForValue().get()
String dataStr = redisTemplate.opsForValue().get("key1");
opsForValue().getAndSet()
//获取原来key的value, 再将新的value写入
String dataStr1 = redisTemplate.opsForValue().getAndSet("key1", "value2");
opsForValue().multiGet()
//批量获取
List<String> keyList = new ArrayList<>();
keyList.add("key1");
keyList.add("key2");
List<String> valueList = redisTemplate.opsForValue().multiGet(keyList);
opsForValue().size()
//获取value的字符长度
Long len = redisTemplate.opsForValue().size("key1");
opsForValue().append()
//追加到末尾, 返回追加后的字符长度; 如果key不存在,则新设置value; 如果key存在,则原value追加新的value;
Integer a = redisTemplate.opsForValue().append("key1","value2");
opsForValue().increment()
//值进行递增或递减, 返回新value;要求value必须可转成数值型
//如果key不存在, 则在默认值0的基础上进行递增或递减
//increment: 递增;可以是使用负数进行递减
Long num1 = redisTemplate.opsForValue().increment("key1", -1);
Long num = redisTemplate.opsForValue().increment("key1", 1);
opsForValue().decrement()
//值进行递增或递减, 返回新的值;要求value必须可转成数值型
//如果key不存在, 则在默认值0的基础上进行递增或递减
//decrement: 递减;可以是使用负数进行递增
Long num2 = redisTemplate.opsForValue().decrement("key2", 1);
Long num3 = redisTemplate.opsForValue().decrement("key2", -1);
特别注意:decrement使用时, 当参数delta递减值为负数时,表示递增,逻辑相反
redisTemplate.delete("key1");
CacheManager
===ache
缓存组件来实际给缓存中存取数据
redis
的starter
,容器中保存的是 RedisCacheManager
;RedisCacheManager
帮我们创建RedisCache
来作为缓存组件;RedisCache
通过操作redis
缓存数据的 k-v
都是Object
;利用序列化保存;如何保存为json
redis
的starter
,cacheManager
变为 RedisCacheManager
;RedisTemplate<Object, Object>
是 默认使用jdk
的序列化机制@Configuration
@EnableCaching
public class RedisConfig {
/**
* 自定义key规则
* @return
*/
@Bean
public KeyGenerator keyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object obj : params) {
sb.append(obj.toString());
}
return sb.toString();
}
};
}
/**
* 设置RedisTemplate规则
* @param redisConnectionFactory
* @return
*/
@Bean(name = "redisTemplate")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(mapper);
template.setValueSerializer(jackson2JsonRedisSerializer);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(stringRedisSerializer);
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化方式采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
/**
* 设置CacheManager缓存规则
* @param factory
* @return
*/
@Bean
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}
@EnableCaching:标记注解 @EnableCaching,开启缓存,并配置Redis缓存管理器。@EnableCaching 注释触发后置处理器, 检查每一个Spring bean 的 public 方法是否存在缓存注解。如果找到这样的一个注释, 自动创建一个代理拦截方法调用和处理相应的缓存行为
实体类Department
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Department implements Serializable {
private Integer id;
private String departmentName;
}
@Mapper
public interface DepartmentMapper {
@Select("SELECT * FROM department WHERE id = #{id}")
Department getDeptById(Integer id);
}
@Service
public class DeptService {
@Autowired
DepartmentMapper departmentMapper;
@Autowired
CacheManager cacheManager;
/**
* 缓存的数据能存入redis;
* 第二次从缓存中查询就不能反序列化回来;
* 存的是dept的json数据;CacheManager默认使用RedisTemplate<Object, Employee>操作Redis
*
* @param id
* @return
*/
@Cacheable(cacheNames = "dept")
public Department getDeptById(Integer id) {
System.out.println("查询部门" + id);
Department department = departmentMapper.getDeptById(id);
return department;
}
// 使用缓存管理器得到缓存,进行api调用
public Department getDeptByIdManager(Integer id) {
System.out.println("查询部门" + id);
Department department = departmentMapper.getDeptById(id);
//获取某个缓存
Cache cache = cacheManager.getCache("dept");
cache.put("dept::" + id, department);
return department;
}
}
@RestController
public class DeptController {
@Autowired
DeptService deptService;
@GetMapping("/dept/{id}")
public Department getDept(@PathVariable("id") Integer id) {
return deptService.getDeptById(id);
}
@GetMapping("/depts/{id}")
public Department getDepts(@PathVariable("id") Integer id) {
return deptService.getDeptByIdManager(id);
}
}
记得在启动类中开启注解缓存,否则不会生效
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。