当前位置:   article > 正文

sharded jedis pipelined 执行后 数据并未存入redis

sharded jedis pipelined 执行后 数据并未存入redis

前言

因为历史原因,在某个同步菜单操作的方法中先清除缓存,然后在初始化缓存。本来很正常的逻辑,但是这个清除是db查询获取所有的菜单 然后循环一条条删除 然后在db查询有效的菜单操作 在循环一条条插进去 经统计这个菜单操作大概有个7千个 执行 耗时过久 大概50s -60s 不等

优化

因为一些体验问题 也自然而然 想到优化

第一种 使用并行 插入或者删除

使用到stream的parallelStream 来并行执行 由于redis本身的单线程执行限制 时间来到了 10-15秒左右 体验效果还不是很好

第二种 使用pipeline 来批量执行命令

由于并行执行 提升的效果有限,我们换个思路来解决问题,减少与redis的交互 将命令批量执行 这样就会大大减少执行耗时 时间来到了 1- 2秒这个优化效果还是比较理想的 但是也发现了新的问题

问题

虽然执行效果很快 但是在初始化缓存的时候 发现并没有成功初始化缓存

先看下 示例代码

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class Test{
	@Autowired
    private ShardedJedis shardedJedis;

    @Test
    public void test(){
        ShardedJedisPipeline pipelined1 = shardedJedis.pipelined();
        //模拟业务逻辑
        for (int i = 0; i < 50; i++) {
            String key = "key:"+i;
            pipelined1.set(key,String.valueOf(i));
            pipelined1.expire(key,-1);
        }
        pipelined1.sync();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

排查定位

这代码看着 好像也没啥问题 批量执行50个key的set 以及expire 操作
最后获取pipeline所有命令的执行结果

期间以为和使用pipelined的set方法 String 入参有关 于是更换为支持byte的方法 未果

后续还以为使用用法不对,经查询多方资料后 发现用法没问题

省略其他的尝试步骤。。。。。

最后将把expire 的设置注释掉 果然可以了

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class)
public class Test{
	@Autowired
    private ShardedJedis shardedJedis;

    @Test
    public void test(){
        ShardedJedisPipeline pipelined1 = shardedJedis.pipelined();
        //模拟业务逻辑
        for (int i = 0; i < 50; i++) {
            String key = "key:"+i;
            pipelined1.set(key,String.valueOf(i));
            //pipelined1.expire(key,-1);
        }
        pipelined1.sync();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

最后问题定位到 是因为 pipelined1.expire(key,-1) 命令执行导致数据无法存入redis

分析

pipelined1.expire(key,-1)  
  • 1

这个命令看似很正常 想法是设置一个 -1 来表示这个缓存无过期时间 但实际上 好像并没有生效
查看源码后 并无没有什么特殊操作

@Deprecated
default Response<Long> expire(String key, int seconds) {
  return expire(key, (long) seconds);
}

Response<Long> expire(String key, long seconds);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

由于未使用 long类型的时间 ,默认调用时间类为 int类型的方法 最后实际上调用的还是 long类型的时间方法
再往下就直接设置命令了

@Override
public Response<Long> expire(final String key, final long seconds) {
    getClient(key).expire(key, seconds);
    return getResponse(BuilderFactory.LONG);
}

# redis.clients.jedis.BinaryClient#expire(byte[], long)
public void expire(final byte[] key, final long seconds) {
   sendCommand(EXPIRE, key, toByteArray(seconds));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

于是找到redis client 执行了命令 发现也很快失效
在这里插入图片描述
于是猜测 -1 这个过期时间会被设置 可能失效时间很短 有可能是 1毫秒 或者1 毫秒
带着问题 去找了下官方文档 看到这样一句描述
在这里插入图片描述
好像只写到了 会将过期时间戳存储为 绝对值 至于传入的时间 为负数 该如何处理并未说明

那就再来看下源码的逻辑
过期命令的实现类在 https://github.com/redis/redis/blob/unstable/src/expire.c

/* EXPIRE key seconds [ NX | XX | GT | LT] */
void expireCommand(client *c) {
    expireGenericCommand(c,commandTimeSnapshot(),UNIT_SECONDS);
}

//核心调用方法
void expireGenericCommand(client *c, long long basetime, int unit) {
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */
    long long current_expire = -1;
    int flag = 0;

    /* checking optional flags */
    if (parseExtendedExpireArgumentsOrReply(c, &flag) != C_OK) {
        return;
    }
    //解析我们传入的时间参数 并赋值给when 这里我们传入的是-1
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
        return;

    /* EXPIRE allows negative numbers, but we can at least detect an
     * overflow by either unit conversion or basetime addition. */
    if (unit == UNIT_SECONDS) {
        if (when > LLONG_MAX / 1000 || when < LLONG_MIN / 1000) {
            addReplyErrorExpireTime(c);
            return;
        }
        when *= 1000;
    }

    if (when > LLONG_MAX - basetime) {
        addReplyErrorExpireTime(c);
        return;
    }
    // 时间戳计算 这里相当于是 当前时间戳 -1 
    when += basetime;

    /* No key, return zero. */
    if (lookupKeyWrite(c->db,key) == NULL) {
        addReply(c,shared.czero);
        return;
    }

    if (flag) {
        current_expire = getExpire(c->db, key);

        /* NX option is set, check current expiry */
        if (flag & EXPIRE_NX) {
            if (current_expire != -1) {
                addReply(c,shared.czero);
                return;
            }
        }

        /* XX option is set, check current expiry */
        if (flag & EXPIRE_XX) {
            if (current_expire == -1) {
                /* reply 0 when the key has no expiry */
                addReply(c,shared.czero);
                return;
            }
        }

        /* GT option is set, check current expiry */
        if (flag & EXPIRE_GT) {
            /* When current_expire is -1, we consider it as infinite TTL,
             * so expire command with gt always fail the GT. */
            if (when <= current_expire || current_expire == -1) {
                /* reply 0 when the new expiry is not greater than current */
                addReply(c,shared.czero);
                return;
            }
        }

        /* LT option is set, check current expiry */
        if (flag & EXPIRE_LT) {
            /* When current_expire -1, we consider it as infinite TTL,
             * but 'when' can still be negative at this point, so if there is
             * an expiry on the key and it's not less than current, we fail the LT. */
            if (current_expire != -1 && when >= current_expire) {
                /* reply 0 when the new expiry is not less than current */
                addReply(c,shared.czero);
                return;
            }
        }
    }

    //检测设置的过期时间 是否已经过期 
    if (checkAlreadyExpired(when)) {
        // 过期执行删除逻辑
        robj *aux;

        int deleted = dbGenericDelete(c->db,key,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
        serverAssertWithInfo(c,key,deleted);
        server.dirty++;

        /* Replicate/AOF this as an explicit DEL or UNLINK. */
        aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
        rewriteClientCommandVector(c,2,aux,key);
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        //删除后 回复了一个 1 和我们之前测试的情况相符
        addReply(c, shared.cone);
        return;
    } else {
        setExpire(c,c->db,key,when);
        addReply(c,shared.cone);
        /* Propagate as PEXPIREAT millisecond-timestamp
         * Only rewrite the command arg if not already PEXPIREAT */
        if (c->cmd->proc != pexpireatCommand) {
            rewriteClientCommandArgument(c,0,shared.pexpireat);
        }

        /* Avoid creating a string object when it's the same as argv[2] parameter  */
        if (basetime != 0 || unit == UNIT_SECONDS) {
            robj *when_obj = createStringObjectFromLongLong(when);
            rewriteClientCommandArgument(c,2,when_obj);
            decrRefCount(when_obj);
        }

        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
        server.dirty++;
        return;
    }
}

//只有在非加载数据和非从实例的情况下,当 when 小于等于当前时间戳时,checkAlreadyExpired 函数才会返回 true,表示该过期时间已经过期,可以立即删除该键。
int checkAlreadyExpired(long long when) {
    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a slave instance.
     *
     * Instead we add the already expired key to the database with expire time
     * (possibly in the past) and wait for an explicit DEL from the master. */
    return (when <= commandTimeSnapshot() && !server.loading && !server.masterhost);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137

看了代码后 思路也清晰了, 这个设置时间过期的逻辑 我们简单梳理下
这个当执行过期时间命令时,我们会传入 key 以及 过期时间(单位秒 或者 毫秒值) 以及 flag 参数 例如 nx xx 等等
核心的逻辑

  • 判断时间参数
  • 计算过期时间 = 当前时间戳 + 传入的过期时间参数 (单位秒/毫秒)
  • 执行 flag参数 逻辑
  • 执行 checkAlreadyExpired 判断时间是否已经过期 只有在 只有在非加载数据和非从实例的情况下,当 when 小于等于当前时间戳时,checkAlreadyExpired 函数才会返回 true 就会走到删除key的逻辑 并返回
  • 没过期则进行设置新的过期时间 并返回

回到我们的执行操作中,我们执行expire 命令传入的时间参数为-1, 那过期时间就设置为当前时间戳 - 1000 。最后又因为设置的过期时间满足过期条件 (when 小于等于当前时间戳 非加载数据和非从实例),所以我们key 立刻会被删除 。这就导致了虽然我们方法执行完成,但是缓存却没有。

解决

当需要设置一个没有过期时间的key的话 无需要调用expire方法 因为默认没有设置过期时间的话 就是永久不失效
在这里插入图片描述
参考官方文档: 官方文档地址: https://redis.io/docs/latest/commands/expire/


good day !!!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/613143
推荐阅读
相关标签
  

闽ICP备14008679号