当前位置:   article > 正文

consul实现分布式锁

consul 分布式锁

分布式一致性问题:

分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可
在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行


分布式锁

是在分布式系统之间同步访问共享资源的一种方式。
不同的系统或者同一个系统不同的主机共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰,从而保证数据的一致性,怎么保证数据的一致性,就用到了分布式锁


那么用锁来解决资源抢占时,又有哪些问题:

1、死锁

死锁是在并发编程中理论上都会出现的问题

什么是死锁:
抢占资源的各方,彼此都在等待对方释放资源,以便自己能获取系统资源,但是没有哪一方退出,这时候就死锁了

产生死锁的4个条件:

  • 互斥条件
  • 不可抢占条件
  • 占用并申请条件
  • 循环等待条件

解决方法:
解决死锁的问题只要解决了上面的4个条件之一即可。那怎么解决呢?
一般用 session + TTL 打破循环等待条件

当一个客户端尝试操作一把分布式锁的时候,我们必须校验其 
session 是否为锁的拥有者,不是则无法进行操作。
当一个客户端已经持有一把分布式锁后,发生了
掉线,在超出了 TTL** 时间后无法连接上,则回收其锁的拥有权。

2、惊群效应

什么是惊群效应?
简单说来,多线程/多进程等待同一个socket事件,当这个事件发生时,这些线程/进程被同时唤醒,就是惊群。可以想见,效率很低下,许多进程被内核重新调度唤醒,同时去响应这一个事件,当然只有一个进程能处理事件成功,其他的进程在处理该事件失败后重新休眠(也有其他选择)。这种性能浪费现象就是惊群。

为了更好的理解何为惊群,举一个很简单的例子,当你往一群鸽子中间扔一粒谷子,所有的各自都被惊动前来抢夺这粒食物,但是最终注定只可能有一个鸽子满意的抢到食物,没有抢到的鸽子只好回去继续睡觉,等待下一粒谷子的到来。这里鸽子表示进程(线程),那粒谷子就是等待处理的事件

解决方法:
为了避免发生惊群效应, NginxZooKeeper 分别使用了不同的方案解决,但是他们的核心解决思路都是一致的,

下面我们来看看 ZooKeeper 是怎么解决 惊群效应 的
我们都知道,在 ZooKeeper 内部会使用临时目录节点的形式创建分布式锁,其中每个节点对应一个客户端的申请锁请求。

当客户端来请求该锁的时候, ZooKeeper 会生成一个 ${lock-name}-${i} 的临时目录,此后每次请求该锁的时候,就会生成 ${lock-name}-${i+1} 的目录,如果此时在 ${lock-name} 中拥有最小的 i 的客户端会获得该锁,而该客户端使用结束之后,就会删除掉自己的临时目录,并通知后续节点进行锁的获取。

没错,这个 iZooKeeper 解决惊群效应的利器,它被称为 顺序节点


Nginx怎么解决惊群
Nginx中处理epoll惊群问题的思路很简单,多个子进程有一个锁,谁拿到锁,谁才将accept的fd加入到epoll队列中,其他的子进程拿不到锁,也就不会将fd加入到epoll中,连接到来也就不会导致所有子进程的epoll被唤醒返回

3、脑裂(brain-split)

什么是脑裂?
脑裂主要是仲裁之间网络中断或不稳定导致

当集群中出现 脑裂 的时候,往往会出现多个 master 的情况,这样数据的一致性会无法得到保障,从而导致整个服务无法正常运行

解决方法:

  1. 可以将集群中的服务作为 P2P 节点,避免 Leader 与 Salve 的切换
  2. 向客户端发起重试,如果一段时间后依然无法连接上,再让下一个顺序客户端获取锁



consul怎么解决上面3个问题

Consul 是 Go 实现的一个轻量级 服务发现 、KV存储 的工具,它通过强一致性的KV存储实现了简易的 分布式锁 ,下面我们根据源码看下 Consul 是怎么解决以上分布式锁的难点的

  1. // api/lock.go
  2. // Lock 分布式锁数据结构
  3. type Lock struct {
  4. c *Client // 提供访问consul的API客户端
  5. opts *LockOptions // 分布式锁的可选项
  6. isHeld bool // 该锁当前是否已经被持有
  7. sessionRenew chan struct{} // 通知锁持有者需要更新session
  8. locksession string // 锁持有者的session
  9. l sync.Mutex // 锁变量的互斥锁
  10. }
  11. // LockOptions 提供分布式锁的可选项参数
  12. type LockOptions struct {
  13. Key string // 锁的 Key,必填项,且必须有 KV 的写权限
  14. Value []byte // 锁的内容,以下皆为选填项
  15. Session string // 锁的session,用于判断锁是否被创建
  16. SessionOpt *SessionEntry // 自定义创建session条目,用于创建session,避免惊群
  17. SessionName string // 自定义锁的session名称,默认为 "Consul API Lock"
  18. SessionTTL string // 自定义锁的TTL时间,默认为 "15s"
  19. MonitorRetries int // 自定义监控的重试次数,避免脑裂问题
  20. MonitorRetryTime time.Duration // 自定义监控的重试时长,避免脑裂问题
  21. LockWaitTime time.Duration // 自定义锁的等待市场,避免死锁问题
  22. LockTryOnce bool // 是否只重试一次,默认为false,则为无限重试
  23. }

惊群

SessionOpt       *SessionEntry // 自定义创建session条目,用于创建session,避免惊群

死锁

LockWaitTime     time.Duration // 自定义锁的等待市场,避免死锁问题

脑裂

MonitorRetries   int           // 自定义监控的重试次数,避免脑裂问题
MonitorRetryTime time.Duration // 自定义监控的重试时长,避免脑裂问题
LockTryOnce      bool               // 是否只重试一次,默认为false,则为无限重试

从 LockOptions 中带有 session / TTL / monitor / wait 等字眼的成员变量可以看出,consul 已经考虑到解决我们上一节提到的三个难点,下面来看看实现代码中是如何使用的

先来看看生成可用的分布式锁的函数 LockOpts :

  1. // api/lock.go
  2. // LockOpts 通过传入锁的参数,返回一个可用的锁
  3. // 必须注意的是 opts.Key 必须在 KV 中有写权限
  4. func (c *Client) LockOpts(opts *LockOptions) (*Lock, error) {
  5. if opts.Key == "" {
  6. return nil, fmt.Errorf("missing key")
  7. }
  8. if opts.SessionName == "" {
  9. opts.SessionName = DefaultLockSessionName // "Consul API Lock"
  10. }
  11. if opts.SessionTTL == "" {
  12. opts.SessionTTL = DefaultLockSessionTTL // "15s"
  13. } else {
  14. if _, err := time.ParseDuration(opts.SessionTTL); err != nil {
  15. return nil, fmt.Errorf("invalid SessionTTL: %v", err)
  16. }
  17. }
  18. if opts.MonitorRetryTime == 0 {
  19. opts.MonitorRetryTime = DefaultMonitorRetryTime // 2 * time.Second
  20. }
  21. if opts.LockWaitTime == 0 {
  22. opts.LockWaitTime = DefaultLockWaitTime // 15 * time.Second
  23. }
  24. l := &Lock{
  25. c: c,
  26. opts: opts,
  27. }
  28. return l, nil
  29. }



我们可以在这个函数中可以注意到:

  • 15s 的 SessionTTL 用于解决死锁、脑裂问题。
  • 2s 的 MonitorRetryTime 是一个长期运行的协程用于监听当前锁持有者,用于解决脑裂问题。
  • 15s 的 LockWaitTime 用于设置尝试获取锁的超时时间,用于解决死锁问题。

Lock 有3个可供其他包访问的函数,分别为 Lock / Unlock / Destroy ,下面按照顺序展开细说
Lock()函数

  1. // Lock尝试获取一个可用的锁,可以通过一个非空的 stopCh 来提前终止获取
  2. // 如果返回的锁发生异常,则返回一个被关闭的 chan struct ,应用程序必须要处理该情况
  3. func (l *Lock) Lock(stopCh <-chan struct) (<-chan struct{}, error) {
  4. // 先锁定本地互斥锁
  5. l.l.Lock()
  6. defer l.l.Unlock()
  7. // 本地已经获取到分布式锁了
  8. if l.isHeld {
  9. return nil, ErrLockHeld
  10. }
  11. // 检查是否需要创建session
  12. l.lockSession = l.opts.Session
  13. if l.lockSession == "" {
  14. s, err := l.createSession()
  15. if err != nil {
  16. return nil, fmt.Errorf("failed to create session: %v", err)
  17. }
  18. l.sessionRenew = make(chan struct{})
  19. l.lockSession = s
  20. session := l.c.Session()
  21. go session.RenewPeriodic(l.opts.SessionTTL, s, nil, l.sessionRenew)
  22. // 如果我们无法锁定该分布式锁,清除本地session
  23. defer func() {
  24. if !l.isHeld {
  25. close(l.sessionRenew)
  26. l.sessionRenew = nil
  27. }
  28. }()
  29. // 准备向consul KV发送查询锁操作的参数
  30. kv := l.c.KV()
  31. qOpts := &QueryOptions{
  32. WaitTime: l.opts.LockWaitTime,
  33. }
  34. start := time.Now()
  35. attempts := 0
  36. WAIT:
  37. // 判断是否需要退出锁争夺的循环
  38. select {
  39. case <-stopCh:
  40. return nil, nil
  41. default:
  42. }
  43. // 处理只重试一次的逻辑
  44. if l.opts.LockTryOnce && attempts > 0 { // 配置该锁只重试一次且已经重试至少一次了
  45. elapsed := time.Since(start) // 获取当前时间偏移量
  46. if elapsed > qOpts.WaitTime { // 当超过设置中的剩余等待时间
  47. return nil, nil // 返回空结果
  48. }
  49. qOpts.WaitTime -= elapsed // 重设剩余等待时间
  50. }
  51. attempts++ // 已尝试次数自增1
  52. // 阻塞查询该存在的分布式锁,直至无法获取成功
  53. pair, meta, err := kv.Get(l.opts.Key, qOpts)
  54. if err != nil {
  55. return nil, fmt.Errorf("failed to read lock: %v", err)
  56. }
  57. }
  58. }

Unlock()函数

  1. // Unlock 尝试释放 consul 分布式锁,如果发生异常则返回 error
  2. func (l *Lock) Unlock() error {
  3. // 在释放锁之前必须先把 Lock 结构锁住
  4. l.l.Lock()
  5. defer l.l.Unlock()
  6. // 确认我们依然持有该锁
  7. if !isHeld {
  8. return ErrLockNotHeld
  9. }
  10. // 提前先将锁的持有权释放
  11. l.isHeld = false
  12. // 清除刷新 session 通道
  13. if l.sessionRenew != nil {
  14. defer func() {
  15. close(l.sessionRenew)
  16. l.sessionRenew = nil
  17. }()
  18. }
  19. // 获取当前 session 持有的锁信息
  20. lockEnt := l.lockEntry(l.lockSession)
  21. l.lockSession = ""
  22. kv := l.c.KV()
  23. _, _, err := kv.Release(lockEnt, nil) // 将持有的锁尝试释放
  24. if err != nil {
  25. return fmt.Errorf("failed to release lock: %v", err)
  26. }
  27. return nil
  28. }

Destry()函数

  1. // Destroy 尝试销毁分布式锁,虽然这不是必要的操作。
  2. // 如果该锁正在被使用,则返回error
  3. func (l *Lock) Destroy() error {
  4. // 在释放锁之前必须先把 Lock 结构锁住
  5. l.l.Lock()
  6. defer l.l.Unlock()
  7. // 确认我们依然持有该锁
  8. if !isHeld {
  9. return ErrLockNotHeld
  10. }
  11. // 获取锁
  12. kv := l.c.KV()
  13. pair, _, err := kv.Get(l.opts.Key, nil)
  14. if err != nil {
  15. return fmt.Errorf("failed to read lock: %v", err)
  16. }
  17. if pair == nil {
  18. return nil
  19. }
  20. // 检查是否有可能状态冲突
  21. if pair.Flags != LockFlagValue {
  22. return ErrLockConflict
  23. }
  24. // 如果锁正在被持有,则返回异常
  25. if pair.Session != "" {
  26. return ErrLockUse
  27. }
  28. // 尝试通过 CAS 删除分布式锁
  29. didRemove, _, err := kv.DeleteCAS(pair, nil)
  30. if err != nil {
  31. return fmt.Errorf("failed to remove lock: %v", err)
  32. }
  33. if !didRemove { // 如果没有删除成功,则返回异常
  34. return ErrLockInUse
  35. }
  36. return nil
  37. }

用golang实现的小demo

  1. package main
  2. import (
  3. api "github.com/hashicorp/consul/api"
  4. "github.com/satori/go.uuid"
  5. "log"
  6. )
  7. func main() {
  8. client, err := api.NewClient(&api.Config{
  9. Address: "127.0.0.1:8500",
  10. })
  11. lockKey := "demo-lock-key"
  12. lock, err := client.lockOpts(&api.LockOptions{
  13. Key: lockKey,
  14. Value: []byte("sender 1"),
  15. SessionTTL: "10s",
  16. SessionOpts: &spi.SessionEntry{
  17. Checks: []string{"check1", "check2"},
  18. Behavior: "release",
  19. },
  20. SessionName: uuid.Must(uuid.NewV4()),
  21. })
  22. if err != nil {
  23. log.Fatalf("failed to created lock %v", err)
  24. }
  25. result, err := lock.Lock(nil)
  26. if err != nil {
  27. log.Fatalf("failed to accquired lock")
  28. }
  29. }

参考:  https://www.jianshu.com/p/44307a394fe1,特别感谢

转载于:https://www.cnblogs.com/jiujuan/p/10527786.html

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号