赞
踩
生产真实项目过程中,需要将数据库的数据同步写入redis,此过程中遇到写入redis的瓶颈。每次启动项目都要将数据库数据重载到redis,这个过程耗费了大量的时间。
Redis pipelined
Redis Pipelined是由Client提供的(是防止client端 阻塞的操作)一种请求redis的方式。Redis本身具有很高的吞吐量,因此性能最大的考察便是网络状况,如果应用到redis的网络状况不好,每次请求都将会出现轻微的 阻塞和延迟,这种延迟对于批量请求是很可怕的,譬如要进行数千次数据插入,或是批量获取数据时,我们就需要用到Pipelined。
Pipelined可以将多个请求无 阻塞的发出并按顺序将请求结果“打包”返回,这有点类似于并发请求,可以有效地利用等待结果的 阻塞时间。
注意,Pipelined并不能保证原子性,即pipelined执行的内容可能会被其他客户端或是线程的指令"插队",若想要原子性操作,需要使用事务。
基于RedisTemplate的pipelined
使用RedisTemplate可以轻松实现pipelined,需要依靠原生的RedisConnection对象实现相关操作!
pipelining(管道)
Pipeline:redis的管道命令,允许client将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可,可以改善性能.
pipeline不是原子操作
为何用?
减少请求次数,将多条请求命令合成一次请求通过管道发给redis server,再通过回调函数一次性接收多个命令的结果,减少网络IO次数,在高并发情况下可带来明显性能提升。注意的是,redis server是单线程,多个命令合成一次请求到达redis server依然还是顺序一个个执行的,仅仅只是减少了请求IO次数。
如何用?
RedisCallback和SessionCallBack:
1.作用: 让RedisTemplate进行回调,通过他们可以在同一条连接中一次执行多个redis命令。
2.SessionCalback提供了良好的封装,优先使用它。
3.RedisCallback使用的是原生RedisConnection,用起来比较麻烦,可读性差,但原生api提供的功能比较齐全。
<dependency>
<groupId>redis.clients</groupId> #maven引入
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
compile('redis.clients:jedis:2.9.0') #gradle引入
compile('org.springframework.data:spring-data-redis:2.0.8.RELEASE')
package pipeline;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class BatchOperSet {
private static final String HOST = "127.0.0.1";
private static final int PORT = 6379;
// 批量插入数据到Redis,正常使用
public static void batchSetNotUsePipeline() throws Exception {
Jedis jedis = new Jedis(HOST, PORT);
String keyPrefix = "normal";
long begin = System.currentTimeMillis();
for (int i = 1; i < 10000; i++) {
String key = keyPrefix + "_" + i;
String value = String.valueOf(i);
jedis.set(key, value);
}
jedis.close();
long end = System.currentTimeMillis();
System.out.println("not use pipeline batch set total time:" + (end - begin));
}
// 批量插入数据到Redis,使用Pipeline
public static void batchSetUsePipeline() throws Exception {
Jedis jedis = new Jedis(HOST, PORT);
Pipeline pipelined = jedis.pipelined();
String keyPrefix = "pipeline";
long begin = System.currentTimeMillis();
for (int i = 1; i < 10000; i++) {
String key = keyPrefix + "_" + i;
String value = String.valueOf(i);
pipelined.set(key, value);
}
pipelined.sync();
jedis.close();
long end = System.currentTimeMillis();
System.out.println("use pipeline batch set total time:" + (end - begin));
}
public static void main(String[] args) {
try {
batchSetNotUsePipeline();
batchSetUsePipeline();
} catch (Exception e) {
e.printStackTrace();
}
}
}
写入性能对比的运行结果如下:
not use pipeline batch get total time:2990
use pipeline batch get total time:41
结论:pipeline模式的写入性能更快
真实案例的采用redis数据结构
Hash(散列)
基本概念:
Redis 散列可以存储多个键值对之间的映射。和字符串一样,散列存储的值既可以是字符串又可以是数值,并且用户同样可以对散列存储的数字值执行自增或自减操作。这个和 Java 的 HashMap 很像,每个 HashMap 有自己的名字,同时可以存储多个 k/v 对。
底层实现:
哈希对象的编码可以是 ziplist 或者 hashtable 。
哈希对象保存的所有键值对的键和值的字符串长度都小于 64 字节并且保存的键值对数量小于 512 个,使用ziplist 编码;否则使用 hashtable;
应用场景:
Hash 更适合存储结构化的数据,比如 Java 中的对象;其实 Java 中的对象也可以用 string 进行存储,只需要将 对象 序列化成 json 串就可以,但是如果这个对象的某个属性更新比较频繁的话,那么每次就需要重新将整个对象序列化存储,这样消耗开销比较大。可如果用 hash 来存储 对象的每个属性,那么每次只需要更新要更新的属性就可以。
购物车场景:可以以用户的 id 为 key ,商品的 id 为存储的 field ,商品数量为键值对的value,这样就构成了购物车的三个要素。
redis 127.0.0.1:6379> hset myhash field1 "zhang" #给键值为myhash的键设置字段为field1,值为zhang。
(integer) 1
redis 127.0.0.1:6379> hget myhash field1 #获取键值为myhash,字段为field1的值。
"zhang"
redis 127.0.0.1:6379> hget myhash field2 #myhash键中不存在field2字段,因此返回nil。
(nil)
redis 127.0.0.1:6379> hset myhash field2 "san" #给myhash添加一个新的字段field2,其值为san。
(integer) 1
redis 127.0.0.1:6379> hlen myhash #hlen命令获取myhash键的字段数量。
(integer) 2
redis 127.0.0.1:6379> hexists myhash field1 #判断myhash键中是否存在字段名为field1的字段,由于存在,返回值为1。
(integer) 1
redis 127.0.0.1:6379> hdel myhash field1 #删除myhash键中字段名为field1的字段,删除成功返回1。
(integer) 1
redis 127.0.0.1:6379> hdel myhash field1 #再次删除myhash键中字段名为field1的字段,由于上一条命令已经将其删除,因为没有删除,返回0。
(integer) 0
redis 127.0.0.1:6379> hexists myhash field1 #判断myhash键中是否存在field1字段,由于上一条命令已经将其删除,因为返回0。
(integer) 0
redis 127.0.0.1:6379> hsetnx myhash field1 zhang #通过hsetnx命令给myhash添加新字段field1,其值为zhang,因为该字段已经被删除,所以该命令添加成功并返回1。
(integer) 1
redis 127.0.0.1:6379> hsetnx myhash field1 zhang #由于myhash的field1字段已经通过上一条命令添加成功,因为本条命令不做任何操作后返回0。
(integer) 0
我们可以用Redis Desktop Manager工具进去看redis数据库里面的内容。
Hash传统代码模式如下:
#按制定的大KEY值往redis放入全部map内容
public static <T> void hPutAll(String hashKey, Map<String,T> map, long timeout){
try{
if(StringUtils.isBlank(hashKey)){
return;
}
BoundHashOperations<String, String, String> stringObjectObjectBoundHashOperations = redisClient.redisTemplate.boundHashOps(hashKey);
map.forEach((key,v)->{
stringObjectObjectBoundHashOperations.put(key,gson.toJson(v));
});
}catch (Exception e){
Logger.error("redis hputall异常",e);
}
}
Hash管道模式如下:
public static <T> void hPutAll(String hashKey, Map<String,T> map, long timeout){
try{
if(StringUtils.isBlank(hashKey)){
return;
}
redisClient.redisTemplate.executePipelined(new SessionCallback<Object>() {
public Object execute(RedisOperations ro) {
BoundHashOperations<String, String, String> hashOps = redisClient.redisTemplate
.boundHashOps(hashKey);
map.forEach((key, v) -> {
hashOps.put(key, gson.toJson(v));
});
//redisClient.redisTemplate.expire(hashKey, defaultTimeOut, TimeUnit.SECONDS);
return null;
}
});
附加set数据结构用管道模式:
@Component
public class RedisTest {
@Autowired
RedisTemplate<String, Object> redisTemplate;
@PostConstruct
public void init() {
test1();
}
public void test1() {
List<Object> pipelinedResultList = redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
ValueOperations<String, Object> valueOperations = (ValueOperations<String, Object>) operations.opsForValue();
valueOperations.set("yzh1", "hello world");
valueOperations.set("yzh2", "redis");
valueOperations.get("yzh1");
valueOperations.get("yzh2");
// 返回null即可,因为返回值会被管道的返回值覆盖,外层取不到这里的返回值
return null;
}
});
System.out.println("pipelinedResultList=" + pipelinedResultList);
}
}
管道预热代码
有的系统对延迟要求很高,那么redis管道第一次请求很慢,就需要在系统启动时进行管道的预热,保证系统启动后每次请求的低延迟。
@PostConstruct
public void init() {
long startTime = System.currentTimeMillis();
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
operations.hasKey((K) "");
return null;
}
});
log.info("redis初始化管道请求end,耗时:{}ms", System.currentTimeMillis() - startTime);
}
附加LIST数据结构代码
public void redisPop(List<String> list) {
List<Object> keys = redisTemplate.executePipelined(new SessionCallback<String>() {
@Override
public <K, V> String execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
for(String str: list){
for (int i = 0; i < 200; i++) {
redisOperations.opsForList().rightPop((K) str);
}
}
return null;
}
});
}
笔者简介
国内某一线知名软件公司企业认证在职员工:任JAVA高级研发工程师,大数据领域专家,数据库领域专家兼任高级DBA!10年软件开发经验!现任国内某大型软件公司大数据研发工程师、MySQL数据库DBA,软件架构师。直接参与设计国家级亿级别大数据项目!并维护真实企业级生产数据库300余个!紧急处理数据库生产事故上百起,挽回数据丢失所造成的灾难损失不计其数!并为某国家级大数据系统的技术方案(国家知识产权局颁布)专利权的第一专利发明人!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。