赞
踩
可用redis的pubsub机制来支持集群内的广播。
使用setnx命令:
SET lock_key random_value NX PX 5000
其中:
random_value
是客户端生成的唯一的字符串,用于在删除时唯一标识client身份。
NX
代表只在键不存在时,才对键进行设置操作。
PX 5000
设置键的过期时间为5000毫秒,用于异常情况下(如client崩溃),该锁可以被自动释放,供其他client使用。
若命令成功,说明获取到锁。否则,表示未获取到锁。
使用lua脚本:
-- 只有唯一字符串匹配,才能认定身份,执行删除操作
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end
前面的内容只是为了说明基于redis分布式锁的实现原理,实际中,可直接使用redisson库的RLock,它在基本的加解锁之外,还提供了同一线程可重入持有锁的能力。
redisson库还提供了很多好用的分布式组件,值得好好学习一下。
信号量(semaphone):保证一个资源同一时间最多只能被N个进程访问
读写锁(rwmutex):同时只能有一个写者或多个读者访问资源,但不能同时既有读者又有写者
条件量(condition)
RPUSH key value [value …]
将一个或多个元素插入到列表key尾部
HINCRBY key field increment
为哈希表 key
中的域 field
的值加上增量 increment
,增量值可为负数
LPOP key
移除并返回列表 key
的头元素。
BLPOP key [key …] timeout
BLPOP是列表的阻塞式(blocking)弹出原语。
它是LPOP命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被BLPOP命令阻塞,直到等待超时或发现可弹出元素为止。
使用阻塞式的pop命令(blpop)可以实现阻塞式锁,使用普通的pop命令来实现非阻塞式锁。但是阻塞的方式是blocking在socket上(具体说来是阻塞在read系统调用上),会占用socket连接,当锁资源使用较多时,连接可能不够。
[lua] KEYS={sema}, ARGV={N}
if sismember DLockMgr.semaphores sema:
return true
sadd DLockMgr.semaphores sema
//初始化信号量
for i in 0..N:
rpush sema 1
return true
if blpop sema timeout:
# record which process hold this sema
hincrby process sema 1
return true
else:
return false
if lpop sema:
# record which process hold this sema
hincrby process sema 1
return true
else:
return false
if hexists process sema: lock_hold_num = hget process sema if lock_hold_num == 0: //just for safe hdel process sema return true #不能释放比lock_hold_num更多资源 n = N < lock_hold_num ? N : lock_hold_num for i in 0..n: rpush sema 1 if lock_hold_num == n: hdel process sema else: hincrby process sema -n return true else: #说明进程并未持有锁,不做任何事返回 return true
release(1)
srem DLockMgr.semaphores sema
del sema
释放一个进程持有的所有信号量,一般用于进程重启时。
[lua] KEYS={DLockMgr.svc.svc1}
for k,v in hgetall DLockMgr.svc.svc1:
if v > 0:
for i in 0..v:
rpush k 1
del DLockMgr.svc.svc1
使用二元信号量即可实现。
假设初始资源数为10,即同时最多允许10个读者
def acquire_read(): mutex.acquire() sema.acquire() mutex.release() def acquire_write(): mutex.acquire() for i in 0..10: sema.acquire() def release_read(): sema.release() def release_write(): sema.release(10) mutex.release()
有如下成员:
waiters_lock_.lock() waiters_++ waiters_lock_.unlock() external_mutex.unlock() sema_.acquire(timeout) waiters_lock_.lock() waiters_-- if was_broadcast_ and waiters_ == 0: last_waiter = True waiters_lock_.unlock() if last_waiter: waiters_done_.release() external_mutex.lock()
waiters_lock_.lock()
if waiters_ > 0:
have_waiters = True
was_broadcast_ = True
waiters_lock_.unlock()
if have_waiters:
sema_.release(waiters_)
waiters_done_.acquire()
was_broadcast_ = False
waiters_lock_.lock()
if waiters_ > 0:
have_waiters = True
waiters_lock_.unlock()
if have_waiters:
sema_.release(1)
分布式超时器可基于redisson的分布式延迟队列(distributed delay queue)来实现,在此基础上做一些高可用的封装。
整个分布式延迟队列用到了3个redis数据结构:
生产者进程做了两件事:
所以生产者是一个极其重要的角色。
消费者作用相对简单,就是消费到期消息。
我们介绍一下分布式超时器的设计思路。
首先,有这么一个原则:超时事件的设置进程不一定是该超时事件的响应进程,因为设置进程有可能在某个时刻恰好挂掉或重启,但超时事件的处理不能丢(确保高可用)。
其次,任何分布式组件的设计难点都在于分布式条件下的异常处理,分布式超时器也不例外,我想到的有如下异常点:
对上述问题,我们逐一讨论解决:
分布式组件有一个不那么常见的问题:
我们是将算法片段还是数据投递到每个工作节点?
像redisson的RScheduledExecutorService采用的是前者,它会把任务类(Runnable)的字节码存储在redis里,待分配到某个工作节点后,就在该节点上用classloader把任务类定义出来,再创建实例运行。
这种做法有它的优势,也有其不足。优点在于,如果数据量很大,投递算法片段更经济,而且设计出来的API会更直观。缺点在于,如果数据量不大,那算法片段的代价就有点高,更重要的,既然是代码,就会有周边依赖性、自身兼容性的问题。以分布式超时器为例,如果像RScheduledExecutorService那样投递代码,任务类对周边组件的依赖就要设计好,周边依赖要保证做到前向兼容;同时,版本升级若修改了任务类的实现,最好要把redis里老的字节码也更新掉,不然老的代码难以做到完善的后向兼容(即便能做到软件向后兼容,也不一定能做到业务上的向后兼容)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。