当前位置:   article > 正文

Redis集群模式下使用Scan命令模糊查询(Java)_redis集群 scan

redis集群 scan

集群模式下scan命令的JAVA代码请拉倒最后,前面为scan命令原理介绍。

1. Redis–单线程

由于Redis是单线程的,因此在使用一些时间复杂度为O(N)的命令时要非常谨慎。可能一不小心就会阻塞进程,导致Redis出现卡顿。

2. keys命令

提到Redis的模糊搜索,很多人的第一反应都是keys命令,但其有两个致命的缺点,在使用时一定要谨慎。

  • 没有limit,我们只能一次性获取所有符合条件的key,如果结果有上百万条,那么等待你的就是“无穷无尽”的字符串输出。
  • keys命令是遍历算法,时间复杂度是O(N)。这个命令非常容易导致Redis服务卡顿。在生产环境中尽量避免使用该命令。

3. scan命令

在Redis2.8时,scan命令应运而生。
相比于keys命令,scan命令有两个比较明显的优势:

  • scan命令的时间复杂度虽然也是O(N),但它是分次进行的,不会阻塞线程。
  • scan命令提供了limit参数,可以控制每次返回结果的最大条数。
    但是scan命令并不是完美的,它返回的结果有可能重复,因此需要客户端去重。

4. Redis结构

Redis使用了Hash表作为底层实现,原因不外乎高效且实现简单。说到Hash表,很多Java程序员第一反应就是HashMap。没错,Redis底层key的存储结构就是类似于HashMap那样数组+链表的结构。其中第一维的数组大小为2n(n>=0)。每次扩容数组长度扩大一倍。

scan命令就是对这个一维数组进行遍历。每次返回的游标值是这个数组的索引。limit参数表示遍历多少个数组的元素,将这些元素下挂接的符合条件的结果都返回。因为每个元素下挂接的链表大小不同,所以每次返回的结果数量也就不同。

5. SCAN的遍历顺序

我们采用一个例子来说明scan命令的遍历顺序

127.0.0.1:6379[3]> keys *
1) "bar"
2) "qux"
3) "baz"
4) "foo"
127.0.0.1:6379[3]> scan 0 count 1
1) "2"
2) 1) "bar"
127.0.0.1:6379[3]> scan 2 count 1
1) "1"
2) 1) "foo"
127.0.0.1:6379[3]> scan 1 count 1
1) "3"
2) 1) "qux"
 2) "baz"
127.0.0.1:6379[3]> scan 3 count 1
1) "0"
2) (empty list or set)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

我们的Redis中有四个key,我们每次只遍历一个一维数组中的元素。如上所示,SCAN命令的遍历顺序是
0->2->1->7
这个顺序乍一看让人很懵逼。我们把它转成二进制再看看呢?
00->10->01->11
我们发现每次这个序列是高位加1的。普通二进制的加法,是从右往左相加、进位。而这个**序列是从左往右相加、进位(高位加1)**的。


在dict.c文件的dictScan函数中对游标进行了如下处理:

m0 = t0->sizemask;
// 将游标的umask位的bit都置为1
v |= ~m0;
// 反转游标
v = rev(v);
// 反转后+1,达到高位加1的效果
v++;
// 再次反转复位
v = rev(v);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

意思是,将游标倒置,加一后,再倒置,也就是我们所说的“高位加1”的操作。
当size为4时,sizemask为3(00000011),游标计算过程:

   v |= ~m0 v = rev(v) v++  v = rev(v)
00000000(0) -> 11111100 -> 00111111 -> 01000000 -> 00000010(2)
00000010(2) -> 11111110 -> 01111111 -> 10000000 -> 00000001(1)
00000001(1) -> 11111101 -> 10111111 -> 11000000 -> 00000011(3)
00000011(3) -> 11111111 -> 11111111 -> 00000000 -> 00000000(0)
  • 1
  • 2
  • 3
  • 4
  • 5

遍历size为4时的游标状态转移为0->2->1->3。

同理,size为8时的游标状态转移为0->4->2->6->1->5->3->7,也就是000->100->010->110->001->101->011->111。

为什么要使用这样的顺序进行遍历,而不是用正常的0、1、2?
因为需要考虑遍历时发生字典扩容与缩容的情况。

5、SCAN的遍历过程:

5.1 发生扩容时

假如我们原始的数组有4个元素,也就是索引有两位,这时需要把它扩充成3位,并进行rehash。
在这里插入图片描述
原来挂接在xx下的所有元素被分配到0xx和1xx下。在上图中,当我们即将遍历10时,dict进行了rehash,这时,scan命令会从010开始遍历,而000和100(原00下挂接的元素)不会再被重复遍历

5.2 发生缩容时

假设dict从3位缩容到2位,当即将遍历110时,dict发生了缩容,这时scan会遍历10。这时010下挂接的元素会被重复遍历,但010之前的元素都不会被重复遍历了。所以,缩容时还是可能会有些重复元素出现的。

6. Redis的rehash

rehash是一个比较复杂的过程,为了不阻塞Redis的进程,其采用了一种渐进式的rehash的机制。

在Redis的字典结构中,有两个hash表,一个新表,一个旧表。在rehash的过程中,redis将旧表中的元素逐步迁移到新表中,接下来我们看一下dict的rehash操作的源码。

/* Performs N steps of incremental rehashing. Returns 1 if there are still
 * keys to move from the old to the new hash table, otherwise 0 is returned.
 *
 * Note that a rehashing step consists in moving a bucket (that may have more
 * than one key as we use chaining) from the old to the new hash table, however
 * since part of the hash table may be composed of empty spaces, it is not
 * guaranteed that this function will rehash even a single bucket, since it
 * will visit at max N*10 empty buckets in total, otherwise the amount of
 * work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

通过注释我们能了解到,rehash的过程是以bucket为基本单位进行迁移的。所谓的bucket其实就是我们前面所提到的一维数组的元素。每次迁移一个列表。
rehash的过程:

  • 首先判断一下是否在进行rehash,如果是,则继续进行;否则直接返回。
  • 接着就是分n步开始进行渐进式rehash。同时还判断是否还有剩余元素,以保证安全性。
  • 在进行rehash之前,首先判断要迁移的bucket是否越界。
  • 然后跳过空的bucket,这里有一个empty_visits变量,表示最大可访问的空bucket的数量,这一变量主要是为了保证不过多的阻塞Redis。
  • 元素迁移,将当前bucket的全部元素进行rehash,并且更新两张表中元素的数量。
  • 每次迁移完一个bucket,将旧表中的bucket指向NULL。
  • 最后判断一下是否全部迁移完成,如果是,则收回空间,重置rehash索引,否则告诉调用方,仍有数据未迁移。

    由于Redis使用的是渐进式rehash机制,因此,scan命令需要同时扫描新表和旧表,将结果返回客户端。

7. Java代码

private JedisCluster cluster;
    public JedisCluster getCluster() {
        return cluster;
    }

/**
* 释放连接
*/
public void close() {
   try {
        cluster.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

/**
* 初始化Redis
*/
public void initialize() {
    LOGGER.info("开始redis初始化, 地址:{}", GlobalConfig.redisAddr);
    //redis实例初始化
    String[] redis = GlobalConfig.redisAddr.split(";|,");
    Set<HostAndPort> nodes = new HashSet<>();
    for (String item : redis) {
        String[] addr = item.split(":");
        nodes.add(new HostAndPort(addr[0], Integer.parseInt(addr[1])));
    }
    JedisPoolConfig config = new JedisPoolConfig();
    config.setMaxTotal(5000);
    config.setMaxIdle(500);
    config.setMaxWaitMillis(5 * 1000);
    config.setTestOnBorrow(true);
    config.setTestOnReturn(true);
    cluster = new JedisCluster(nodes, config);
    LOGGER.info("redis初始化完成!");
}

/**
* Redis Scan集群模式中的应用
*/
public void service() {
        String keyName = "pie:task:data:";
        Set<Task> taskSet = new HashSet<>();
        AtomicInteger count = new AtomicInteger();
        JedisCluster cluster = redisDao.getCluster();
        Map<String, JedisPool> clusterNodes = cluster.getClusterNodes();
        for (Iterator<JedisPool> it = clusterNodes.values().iterator(); it.hasNext(); ) {
            JedisPool pool = it.next();
            Jedis jedis = null;
            try {
                jedis = pool.getResource();

                ScanParams scanParams = new ScanParams();
                scanParams.match(keyName + "*");
                scanParams.count(1024);
                String scanRet = "0";
                do {
                    ScanResult<String> result = jedis.scan(scanRet, scanParams);
                    scanRet = result.getStringCursor();
                    for (String key : result.getResult()) {
                        if (jedis.exists(key)) {
                            Map<String, String> taskMap = jedis.hgetAll(key);
                            Task task = Task.fromRedisMap(taskMap);
                            taskSet.add(task);
                            count.getAndIncrement();
                            if(taskSet.size() > 10000){
                            // 业务逻辑
                            taskSet.clear();
                        }
                    }
                } while (!scanRet.equals("0"));
                LOGGER.error("共从Redis中获取到:{}条数据", count);
                LOGGER.info("获取Redis数据完成");
            } catch (Exception e) {
                LOGGER.error("RedisClusterCacheUtil.keys occurred Exceotion", e);
            } finally {
                if (null != jedis)
                    jedis.close();
            }
        }
        return ;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/240390
推荐阅读
相关标签
  

闽ICP备14008679号