赞
踩
由于Redis是单线程的,因此在使用一些时间复杂度为O(N)的命令时要非常谨慎。可能一不小心就会阻塞进程,导致Redis出现卡顿。
提到Redis的模糊搜索,很多人的第一反应都是keys命令,但其有两个致命的缺点,在使用时一定要谨慎。
在Redis2.8时,scan命令应运而生。
相比于keys命令,scan命令有两个比较明显的优势:
Redis使用了Hash表作为底层实现,原因不外乎高效且实现简单。说到Hash表,很多Java程序员第一反应就是HashMap。没错,Redis底层key的存储结构就是类似于HashMap那样数组+链表的结构。其中第一维的数组大小为2n(n>=0)。每次扩容数组长度扩大一倍。
scan命令就是对这个一维数组进行遍历。每次返回的游标值是这个数组的索引。limit参数表示遍历多少个数组的元素,将这些元素下挂接的符合条件的结果都返回。因为每个元素下挂接的链表大小不同,所以每次返回的结果数量也就不同。
我们采用一个例子来说明scan命令的遍历顺序
127.0.0.1:6379[3]> keys *
1) "bar"
2) "qux"
3) "baz"
4) "foo"
127.0.0.1:6379[3]> scan 0 count 1
1) "2"
2) 1) "bar"
127.0.0.1:6379[3]> scan 2 count 1
1) "1"
2) 1) "foo"
127.0.0.1:6379[3]> scan 1 count 1
1) "3"
2) 1) "qux"
2) "baz"
127.0.0.1:6379[3]> scan 3 count 1
1) "0"
2) (empty list or set)
我们的Redis中有四个key,我们每次只遍历一个一维数组中的元素。如上所示,SCAN命令的遍历顺序是
0->2->1->7
这个顺序乍一看让人很懵逼。我们把它转成二进制再看看呢?
00->10->01->11
我们发现每次这个序列是高位加1的。普通二进制的加法,是从右往左相加、进位。而这个**序列是从左往右相加、进位(高位加1)**的。
在dict.c文件的dictScan函数中对游标进行了如下处理:
m0 = t0->sizemask;
// 将游标的umask位的bit都置为1
v |= ~m0;
// 反转游标
v = rev(v);
// 反转后+1,达到高位加1的效果
v++;
// 再次反转复位
v = rev(v);
意思是,将游标倒置,加一后,再倒置,也就是我们所说的“高位加1”的操作。
当size为4时,sizemask为3(00000011),游标计算过程:
v |= ~m0 v = rev(v) v++ v = rev(v)
00000000(0) -> 11111100 -> 00111111 -> 01000000 -> 00000010(2)
00000010(2) -> 11111110 -> 01111111 -> 10000000 -> 00000001(1)
00000001(1) -> 11111101 -> 10111111 -> 11000000 -> 00000011(3)
00000011(3) -> 11111111 -> 11111111 -> 00000000 -> 00000000(0)
遍历size为4时的游标状态转移为0->2->1->3。
同理,size为8时的游标状态转移为0->4->2->6->1->5->3->7,也就是000->100->010->110->001->101->011->111。
为什么要使用这样的顺序进行遍历,而不是用正常的0、1、2?
因为需要考虑遍历时发生字典扩容与缩容的情况。
假如我们原始的数组有4个元素,也就是索引有两位,这时需要把它扩充成3位,并进行rehash。
原来挂接在xx下的所有元素被分配到0xx和1xx下。在上图中,当我们即将遍历10时,dict进行了rehash,这时,scan命令会从010开始遍历,而000和100(原00下挂接的元素)不会再被重复遍历。
假设dict从3位缩容到2位,当即将遍历110时,dict发生了缩容,这时scan会遍历10。这时010下挂接的元素会被重复遍历,但010之前的元素都不会被重复遍历了。所以,缩容时还是可能会有些重复元素出现的。
rehash是一个比较复杂的过程,为了不阻塞Redis的进程,其采用了一种渐进式的rehash的机制。
在Redis的字典结构中,有两个hash表,一个新表,一个旧表。在rehash的过程中,redis将旧表中的元素逐步迁移到新表中,接下来我们看一下dict的rehash操作的源码。
/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table, however
* since part of the hash table may be composed of empty spaces, it is not
* guaranteed that this function will rehash even a single bucket, since it
* will visit at max N*10 empty buckets in total, otherwise the amount of
* work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
通过注释我们能了解到,rehash的过程是以bucket为基本单位进行迁移的。所谓的bucket其实就是我们前面所提到的一维数组的元素。每次迁移一个列表。
rehash的过程:
private JedisCluster cluster;
public JedisCluster getCluster() {
return cluster;
}
/**
* 释放连接
*/
public void close() {
try {
cluster.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 初始化Redis
*/
public void initialize() {
LOGGER.info("开始redis初始化, 地址:{}", GlobalConfig.redisAddr);
//redis实例初始化
String[] redis = GlobalConfig.redisAddr.split(";|,");
Set<HostAndPort> nodes = new HashSet<>();
for (String item : redis) {
String[] addr = item.split(":");
nodes.add(new HostAndPort(addr[0], Integer.parseInt(addr[1])));
}
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(5000);
config.setMaxIdle(500);
config.setMaxWaitMillis(5 * 1000);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
cluster = new JedisCluster(nodes, config);
LOGGER.info("redis初始化完成!");
}
/**
* Redis Scan集群模式中的应用
*/
public void service() {
String keyName = "pie:task:data:";
Set<Task> taskSet = new HashSet<>();
AtomicInteger count = new AtomicInteger();
JedisCluster cluster = redisDao.getCluster();
Map<String, JedisPool> clusterNodes = cluster.getClusterNodes();
for (Iterator<JedisPool> it = clusterNodes.values().iterator(); it.hasNext(); ) {
JedisPool pool = it.next();
Jedis jedis = null;
try {
jedis = pool.getResource();
ScanParams scanParams = new ScanParams();
scanParams.match(keyName + "*");
scanParams.count(1024);
String scanRet = "0";
do {
ScanResult<String> result = jedis.scan(scanRet, scanParams);
scanRet = result.getStringCursor();
for (String key : result.getResult()) {
if (jedis.exists(key)) {
Map<String, String> taskMap = jedis.hgetAll(key);
Task task = Task.fromRedisMap(taskMap);
taskSet.add(task);
count.getAndIncrement();
if(taskSet.size() > 10000){
// 业务逻辑
taskSet.clear();
}
}
} while (!scanRet.equals("0"));
LOGGER.error("共从Redis中获取到:{}条数据", count);
LOGGER.info("获取Redis数据完成");
} catch (Exception e) {
LOGGER.error("RedisClusterCacheUtil.keys occurred Exceotion", e);
} finally {
if (null != jedis)
jedis.close();
}
}
return ;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。