当前位置:   article > 正文

通过redis加分布式锁 解决mysql读写冲突_redis读写冲突

redis读写冲突

背景

写业务过程中,有时候会使用到锁的概念,同时只能有一个人可以操作某个行为。这个时候我们就要用到锁。
锁的方式有好几种,某些语言如php不能在内存中用锁,不能使用zookeeper加锁,使用数据库做锁又消耗比较大,这个时候我们一般会选用redis做锁机制。

分布式锁介绍

本部分引用自:https://blog.csdn.net/oqkdws/article/details/81985699

分布式锁是控制分布式系统或不同系统之间共同访问共享资源的一种锁实现,如果不同的系统或同一个系统的不同主机之间共享了某个资源时,往往需要互斥来防止彼此干扰来保证一致性。

比如redis的操作虽然是原子性的,但是,当你的接口被多个用户同时请求的时候,由于读取写入之间存在间隔 就会出现问题

分布式锁需要具备哪些条件?

  • 互斥性:在任意一个时刻,只有一个客户端持有锁。
  • 无死锁:即便持有锁的客户端崩溃或者其他意外事件,锁仍然可以被获取。
  • 容错:只要大部分Redis节点都活着,客户端就可以获取和释放锁

分布式锁实现方式

分布式锁的主要实现有哪些?

  • 数据库
  • Memcached(add命令)
  • Redis(setnx命令)
  • Zookeeper(临时节点)
    https://blog.csdn.net/oqkdws/article/details/81985699

Redis单机锁

由于读写问题 redis的锁需要读写主库 别分库了

setnx

$rs = $redis->setNX($key, $value); 
if ($rs) { 
//处理更新缓存逻辑 
// ...... 
//删除锁 $redis->del($key); 
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

通过 setNX 获取锁,如果成功了则更新缓存然后删除锁。其实这里有一个严重的问题:如果更新缓存的时候因为某些原因意外退出了,那么这个锁就不会被删除而一直存在,以至于缓存再也得不到更新。为了解决这个问题有人可能会想到给锁设置一个过期时间,如下

$redis->multi(); 
$redis->setNX($key, $value); 
$redis->expire($key, $ttl); 
$redis->exec(); ?>
  • 1
  • 2
  • 3
  • 4

因为 setNX 不具备设置过期时间的功能,所以要借助 Expire 来设置,同时需要使用 Multi/Exec 来确保请求的原子性,以免 setNX 成功了 Expire 却失败了。(问题就在于setnx和expire中间如果遇到crash等行为,可能这个lock就不会被释放了。于是进一步的优化方案也可在lock中存储timestamp。判断timestamp的长短。)

这样还有问题:当多个请求到达时,虽然只有一个请求的 setNX 可以成功,但是任何一个请求的 Expire 却都可以成功,这就意味着即便获取不到锁也可以刷新过期时间,导致锁一直有效,还是解决不了上面的问题。显然 setNX 满足不了需求。

增强的set(推荐使用)

Redis从 2.6.12 起,SET 涵盖了 SETEX 的功能, SET 本身又包含了设置过期时间的功能,所以使用 SET 就可以解决上面遇到的问题。

$rs = $redis->set($key, $value, array('nx', 'ex' => $ttl)); 
if ($rs) { 
	//处理更新缓存逻辑 
	// ...... 
	//删除锁 
	$redis->del($key); 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

防止锁被误删

到这一步其实还是有问题的,如果一个请求更新缓存的时间比锁的有效期还要长,导致在缓存更新过程中锁就失效了,此时另一个请求就会获取到锁,但前一个请求在缓存更新完毕的时候,直接删除锁的话就会出现误删其它请求创建的锁的情况。
(因为有可能导致误删别人的锁的情况。比如,这个锁我上了10s,但是我处理的时间比10s更长,到了10s,这个锁自动过期了,被别人取走了,并且对它重新上锁了。那么这个时候,我再调用Redis::del就是删除别人建立的锁了。)
所以要避免这种问题,可以在创建锁的时候需要引入一个随机值并在删除锁的时候加以判断(当然随机数字可能恰巧·· 后面可改用时间戳)

$rs = $redis->set($key, $random, array('nx', 'ex' => $ttl)); 
if ($rs) { 
	//处理更新逻辑 
	// ...... 
	//先判断随机数,是同一个则删除锁 
	if ($redis->get($key) == $random) { 
		$redis->del($key); 
	} 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

防止处理超时锁过期

但是这里似乎还存在一个问题?比如插入操作 第一个插入10秒可能还没操作完,结果就到期了。这时候第二个请求获得了锁 进来了。结果重复插入。所以 这个时间应该设置的比操作长 保证能xxx

流程图

在这里插入图片描述

封装

lua脚本封装

这个解锁也可用lua脚本实现(似乎redis官方更推荐)
官方对解锁的命令也有建议,建议使用lua脚本,先进行get,再进行del

程序变成:

$token = rand(1, 100000);//可替换为 UUID

function lock() {
    return Redis::set("my:lock", $token, "nx", "ex", 10);
}

function unlock() {
    $script = `
if redis.call("get",KEYS[1]) == ARGV[1]
then
    return redis.call("del",KEYS[1])
else
    return 0
end    
    `
    return Redis::eval($script, "my:lock", $token)
}

//业务代码
if (lock()) {
    // do something
    unlock();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
//注 php如何运行lua脚本
$redis = new Redis(); 
// 编写lua 脚本
$lua = <<<SCRIPT
//lua脚本内容
SCRIPT; 

$s = $redis->eval($lua, array('name'),1);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

php版封装

//后续可考虑封装为函数 更方便
$redis = RedisService::getInstance(xxxx);//写库
$random = rand(0,99999);
$key = 'lock:bind:'.$m2;
$expire_time = 20;
$can_update = $redis->set($key, $random, array('nx', 'ex' =>$expire_time))
if ($can_update) {
	//处理更新逻辑
	//....
	//先判断随机数,是同一个则删除锁
	if ($redis->get($key) == $random) {
		$redis->del($key);
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

开源实现

在这里插入图片描述

watch乐观锁方案

watch就是乐观锁,也就是用时间戳的方式实现,不是真正的形成等待,而是时间戳不一致时(也就是有其他线程/进程/机器修改时),令自己的修改失败。setnx方式就是类似于悲观锁了,也就是拿不到就失败,不会等到修改的时候再失败(或者说,必须拿到锁才修改)。

Redis Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断
这个以后再更新

//ps 这里里面内容还没看 也不知道好不好
我们已经知道在一个事务中只有当所有命令都依次执行完后才能得到每个结果的返回值,可是有些情况下需要先获得一条命令的返回值,然后再根据这个值执行下一条命令。例如,介绍INCR命令时曾经说过使用GETSET命令自己实现incr函数会出现竞态条件,伪代码如下:

def incr($key)

    $value = GET $key

    if not $value

           $value = 0

    $value = $value + 1

    SET $key, $value

    return $value

肯定会有很多读者想到可以用事务来实现incr函数以防止竞态条件,可是因为事务中的每个命令的执行结果都是最后一起返回的,所以无法将前一条命令的结果作为下一条命令的参数,即在执行SET命令时无法获得GET命令的返回值,也就无法做到增1的功能了。

为了解决这个问题,我们需要换一种思路。即在 GET 获得键值后保证该键值不被其他客户端修改,直到函数执行完成后才允许其他客户端修改该键键值,这样也可以防止竞态条件。要实现这一思路需要请出事务家族的另一位成员:WATCHWATCH命令可以监控一个或多个键,一旦其中有一个键被修改(或删除),之后的事务就不会执行。监控一直持续到EXEC命令(事务中的命令是在EXEC之后才执行的,所以在MULTI命令后可以修改WATCH监控的键值),如:

redis> SET key 1

OK

redis> WATCH key

OK

redis> SET key 2

OK

redis> MULTI

OK

redis> SET key 3

QUEUED

redis> EXEC

(nil)

redis> GET key

"2"

上例中在执行WATCH命令后、事务执行前修改了key的值(即SET key 2),所以最后事务中的命令SET key 3没有执行,EXEC命令返回空结果。

学会了WATCH命令就可以通过事务自己实现incr函数了,伪代码如下:

def incr($key)

    WATCH $key

    $value = GET $key

     if not $value

            $value = 0

     $value = $value + 1

    MULTI

    SET $key, $value

     result = EXEC

    return result[0]

因为EXEC命令返回值是多行字符串类型,所以代码中使用result[0]来获得其中第一个结果。

提示  由于WATCH命令的作用只是当被监控的键值被修改后阻止之后一个事务的执行,而不能保证其他客户端不修改这一键值,所以我们需要在EXEC执行失败后重新执行整个函数。

执行EXEC命令后会取消对所有键的监控,如果不想执行事务中的命令也可以使用UNWATCH命令来取消监控。比如,我们要实现hsetxx函数,作用与HSETNX命令类似,只不过是仅当字段存在时才赋值。为了避免竞态条件我们使用事务来完成这一功能:

def hsetxx($key, $field, $value)

    WATCH $key

    $isFieldExists = HEXISTS $key, $field

    if $isFieldExists is 1

          MULTI

          HSET $key, $field, $value

          EXEC

    else

          UNWATCH

    return $isFieldExists

在代码中会判断要赋值的字段是否存在,如果字段不存在的话就不执行事务中的命令,但需要使用UNWATCH命令来保证下一个事务的执行不会受到影响。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

Redis分布式集群锁

在Redis的分布式环境中,Redis 的作者提供了RedLock 的算法来实现一个分布式锁

加锁

  • 获取当前Unix时间,以毫秒为单位
  • 依次尝试从N个实例,使用相同的key和随机值获取锁。在步2,当向Redis设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则招时时间应该在5-50室秒之间,这样可以游免服务器端Redis已经持掉的情况下,客户端还在死死地等待响应结果,如果服务器挡没有在规定时日内响应,客户端应该尽快尝试另外一个Redis实例。
  • 客户端使用当前时间减去开始获取锁时间(步赚1记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是3个节点)的Redis节点都取到锁,并且使用的时问小于锁失效时问时,销才算获取成功。
  • 如果取到了锁,kev的真正有效时间等于有效时间减去获取锁所使用的时间(步骤3计算的结果)。
  • 如果因为某些原因,获取锁失败(没有在至少N/2+1个Redis实例取到锁或者取销时间已经超过了有效时间),客户端应该在所有的Redis实例上进行解锁 (即便某些Redis实例根本就没有加锁成功》。

解锁

  • 向所有的Redis实例发送释放锁命令即可,不用关心之前有没有从Redis实例成功获取到锁

  • 总结
    Java环境有 Redisson 可用于生产环境,但是分布式锁还是Zookeeper会比较好

redis官方推荐的分布式锁解决方案:https://blog.csdn.net/xiaohuihuiaz/article/details/124389898

高并发和主从同步场景的区别

本文前面讲的是高并发场景,而非主从同步场景。
主从同步相对好解决一些,redis的setex、get即可。
这里的主从同步是mysql的

比如,当用户绑定某台终端的时候,主库成功绑定了,但是用户刷新终端列表的时候,由于高峰期主从同步延迟问题,看不到这台终端。
用户没看到效果,又去扫码,造成重复绑定某台终端。

public function insertToDb($wxid,$m2){
    $redis = RedisService::getInstance(Constants::REDIS_WRITER, 'WX_MINIPROGRAM_RDS', 0, true);
    $key_db_defer = 'binddefer:' . $wxid . $m2;//
    if ($redis->get($key_db_defer) == 1) throw new Exception("已绑定,数据同步中,请稍后刷新查看");
    //插入逻辑
    //....
    if(插入成功) {
        $redis->setex($key_db_defer, 360, 1);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

场景上

  • 高并发是,保证读写是一个原子操作。
  • 延迟的情况是,主库插入了,从库还没查到。即使读写是一个原子操作也没用。
    所以,在主redis上插入信息,数据库插入的时候查一下redis,已经插入就不插入了。设置个较长的过期时间就行和更新操作的时候删除锁。

具体操作上:

  • 高并发,还是运行你进行高频率操作,只是需要等前面的执行完了才行。所以,加锁和解锁在一个函数里。比如同一个操作中完成绑定、解绑操作。
  • 主从延迟是,就不允许你再操作了,除非你先完成其他的某个操作或者等待足够长时间后。**加锁和解锁不再同一次接口访问中(或者说就不解锁)。**比如你绑定了一台设备,所以在绑定设备的时候加锁。在你解绑的时候去掉锁(或者时间很长过期了)。

others

比如绑定逻辑。解绑的时候要宽容一点 不要设置del或者update的limit为1 可以多一点 或者不限制。因为可能读写冲突、主从同步问题造成insert了好几条的情况。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/641832
推荐阅读
相关标签
  

闽ICP备14008679号