当前位置:   article > 正文

一些常用的分布式组件设计技巧_redisson readmode

redisson readmode

广播

可用redis的pubsub机制来支持集群内的广播。

基于redis的分布式锁

加锁

使用setnx命令:

SET lock_key random_value NX PX 5000

其中:
random_value 是客户端生成的唯一的字符串,用于在删除时唯一标识client身份。
NX 代表只在键不存在时,才对键进行设置操作。
PX 5000 设置键的过期时间为5000毫秒,用于异常情况下(如client崩溃),该锁可以被自动释放,供其他client使用。

若命令成功,说明获取到锁。否则,表示未获取到锁。

解锁

使用lua脚本

-- 只有唯一字符串匹配,才能认定身份,执行删除操作
if redis.call('get',KEYS[1]) == ARGV[1] then 
   return redis.call('del',KEYS[1]) 
else
   return 0 
end
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

redisson库

前面的内容只是为了说明基于redis分布式锁的实现原理,实际中,可直接使用redisson库的RLock,它在基本的加解锁之外,还提供了同一线程可重入持有锁的能力。
redisson库还提供了很多好用的分布式组件,值得好好学习一下。

基于redis实现常用分布式组件

基本概念

信号量(semaphone):保证一个资源同一时间最多只能被N个进程访问

读写锁(rwmutex):同时只能有一个写者或多个读者访问资源,但不能同时既有读者又有写者

条件量(condition)

用到的redis命令

RPUSH key value [value …]

将一个或多个元素插入到列表key尾部

HINCRBY key field increment

为哈希表 key 中的域 field 的值加上增量 increment ,增量值可为负数

LPOP key

移除并返回列表 key 的头元素。

BLPOP key [key …] timeout

BLPOP是列表的阻塞式(blocking)弹出原语。

它是LPOP命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被BLPOP命令阻塞,直到等待超时或发现可弹出元素为止。

信号量

通过队列实现

使用阻塞式的pop命令(blpop)可以实现阻塞式锁,使用普通的pop命令来实现非阻塞式锁。但是阻塞的方式是blocking在socket上(具体说来是阻塞在read系统调用上),会占用socket连接,当锁资源使用较多时,连接可能不够。

getSemaphore
[lua] KEYS={sema}, ARGV={N}
if sismember DLockMgr.semaphores sema:
    return true
sadd DLockMgr.semaphores sema
//初始化信号量
for i in 0..N:    
    rpush sema 1
return true 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
def acquire(timeout):
if blpop sema timeout:    
    # record which process hold this sema
    hincrby process sema 1  
    return true
else:
    return false 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
def tryacquire():
if lpop sema:
    # record which process hold this sema
    hincrby process sema 1
    return true
else:
    return false   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
def release(N):
if hexists process sema:
    lock_hold_num = hget process sema
    if lock_hold_num == 0: //just for safe
        hdel process sema
        return true
    #不能释放比lock_hold_num更多资源
    n = N < lock_hold_num ? N : lock_hold_num   
    for i in 0..n:  
        rpush sema 1  
    if lock_hold_num == n:
        hdel process sema
    else:    
        hincrby process sema -n
    return true    
else:
    #说明进程并未持有锁,不做任何事返回
    return true         
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
release()
release(1)    
  • 1
remove()
srem DLockMgr.semaphores sema

del sema
  • 1
  • 2
  • 3
releaseAllLock

释放一个进程持有的所有信号量,一般用于进程重启时。

[lua] KEYS={DLockMgr.svc.svc1}
for k,v in hgetall DLockMgr.svc.svc1:
    if v > 0:
        for i in 0..v:
            rpush k 1
del DLockMgr.svc.svc1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

互斥锁

使用二元信号量即可实现。

读写锁

假设初始资源数为10,即同时最多允许10个读者

def acquire_read(): 
    mutex.acquire()
    sema.acquire()
    mutex.release()

def acquire_write(): 
    mutex.acquire()
    for i in 0..10:       
        sema.acquire()        

def release_read(): 
    sema.release()
    
def release_write(): 
    sema.release(10)
    mutex.release()    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Condition实现

cond_init

有如下成员:

  • waiters_ = 0 等待线程数
  • was_broadcast_ = False 是否广播
  • sema_ 等待信号量,init=0
  • waiters_lock_ 用于保障waiters_原子操作的线程锁
  • waiters_done_ 控制所有等待线程结束的信号量,init=0

cond_wait

waiters_lock_.lock()
waiters_++
waiters_lock_.unlock()

external_mutex.unlock()

sema_.acquire(timeout)

waiters_lock_.lock()
waiters_--
if was_broadcast_ and waiters_ == 0:
    last_waiter = True
waiters_lock_.unlock()

if last_waiter:
    waiters_done_.release()

external_mutex.lock()    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

cond_broadcast

waiters_lock_.lock()
if waiters_ > 0:
    have_waiters = True
    was_broadcast_ = True
waiters_lock_.unlock()

if have_waiters:
    sema_.release(waiters_)
    waiters_done_.acquire()
    was_broadcast_ = False
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

cond_signal

waiters_lock_.lock()
if waiters_ > 0:
    have_waiters = True
waiters_lock_.unlock()

if have_waiters:
    sema_.release(1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

分布式超时器

分布式超时器可基于redisson的分布式延迟队列(distributed delay queue)来实现,在此基础上做一些高可用的封装。

redisson分布式延迟队列原理

整个分布式延迟队列用到了3个redis数据结构:

  • 时间最小堆,列头就是最近即将到期的消息
  • 顺序消息队列,按设置的先后顺序排列的消息
  • 目标消息队列,已到期的消息,待消费者拉取

在这里插入图片描述

生产者进程做了两件事:

  1. 往时间最小堆和顺序消息队列插入消息
  2. 定时从时间最小堆和顺序队列里把到期的消息移到目标队列

所以生产者是一个极其重要的角色。

消费者作用相对简单,就是消费到期消息。

设计思路

我们介绍一下分布式超时器的设计思路。

首先,有这么一个原则:超时事件的设置进程不一定是该超时事件的响应进程,因为设置进程有可能在某个时刻恰好挂掉或重启,但超时事件的处理不能丢(确保高可用)。

其次,任何分布式组件的设计难点都在于分布式条件下的异常处理,分布式超时器也不例外,我想到的有如下异常点:

  • redis故障了怎么处理?比如断AZ演练
  • 超时事件的消费进程未处理完就崩溃,怎么办?
  • 超时事件的消费进程处理失败了,怎么办?
  • 所有进程都不在的情况下,超时事件会不会丢失,待进程起来后能不能补偿处理?
  • 灰度切换对超时事件的处理有无特殊要求?

对上述问题,我们逐一讨论解决:

  • redis断AZ的情况,由于我们的redis集群里的master和slave节点分属不同的AZ,理论上不会出现master和slave同时挂掉的情况,那么在master 挂掉时,集群能自动把老master带的slave推举为新master,同时我们把redisson的readmode设为MASTER_SLAVE,以确保在master选举过程中,也能正常读取redis(readmode=MASTER_SLAVE时redisson在读master失败时会去读slave)。当然,由于主从复制的异步特点,新master选举出来后,小概率下可能存在数据丢失。这种情况下,可通过特殊机制去处理,比如提前将超时事件存入mysql,并定期对比mysql和redis里的事件,redis若没有就用mysql里的事件补偿。
  • 消费超时事件的进程半途崩溃,超时事件未处理完。消费进程在处理之初可申请资源锁,处理结束释放资源锁,若资源锁并非正常释放,而是过期触发的释放,说明该超时事件可能需要重新处理。
  • 消费超时事件失败了,建议应用层自行捕捉异常并重试,框架层暂不考虑失败重试机制,以免实现过于复杂。
  • 如前所述,redisson的分布式延迟队列内部使用了三个专门的redis数据结构来存放未到期和到期的超时事件,所以,如果所有进程不在,这些超时事件还会保留在redis里,待进程恢复后,可以继续处理。
  • 关于灰度,考虑到我们的实际情况是:新版本作为灰度存在只有很短的一段时间,大部分时间都是老版本作为灰度存在,分布式超时器框架将不让灰度节点来处理超时事件(毕竟代码要做到后向兼容不是一件易事)

讨论:投递算法还是投递数据?

分布式组件有一个不那么常见的问题:

我们是将算法片段还是数据投递到每个工作节点?

像redisson的RScheduledExecutorService采用的是前者,它会把任务类(Runnable)的字节码存储在redis里,待分配到某个工作节点后,就在该节点上用classloader把任务类定义出来,再创建实例运行。

这种做法有它的优势,也有其不足。优点在于,如果数据量很大,投递算法片段更经济,而且设计出来的API会更直观。缺点在于,如果数据量不大,那算法片段的代价就有点高,更重要的,既然是代码,就会有周边依赖性、自身兼容性的问题。以分布式超时器为例,如果像RScheduledExecutorService那样投递代码,任务类对周边组件的依赖就要设计好,周边依赖要保证做到前向兼容;同时,版本升级若修改了任务类的实现,最好要把redis里老的字节码也更新掉,不然老的代码难以做到完善的后向兼容(即便能做到软件向后兼容,也不一定能做到业务上的向后兼容)。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/523935
推荐阅读
相关标签
  

闽ICP备14008679号