当前位置:   article > 正文

浅谈分布式锁_分布式任务调度 使用分布式锁

分布式任务调度 使用分布式锁

为什么需要锁

单机程序,在多线程并发情况下,操作同一资源时,需要对其进行加锁等同步措施来保证原子性。举一个多线程自增的例子:

package main
import (
   "sync"
)
// 全局变量
var counter int
func main() {
   var wg sync.WaitGroup
   for i := 0; i < 100; i++ {
       wg.Add(1)
       go func() {
            counter++
            wg.Done()
       }()
   }
   wg.Wait()
   println(counter)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

多次运行会得到不同的结果:

> go run test.go
98
> go run test.go
99
> go run test.go
100
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

显然这个结果不能让人满意,充满了不可预知。想要得到正确结果,就需要对计数自增加锁

package main

import (
    "sync"
)

// 全局变量
var counter int
var mtx sync.Mutex

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            mtx.Lock()
            counter++
            mtx.Unlock()
            wg.Done()
        }()
    }
    wg.Wait()
    println(counter)
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

多次运行后得到的结果:

> go run test.go
100
> go run test.go
100
> go run test.go
100
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
一、基于Redis的setnx

在分布式场景下,我们也需要这种"抢占"的逻辑,这时候怎么办?我们可以使用Redis提供的setnx命令:

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"

    "gopkg.in/redis.v5"
)

var rds = redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "mymaster",
    SentinelAddrs: []string{"127.0.0.1:26379"},
})

// 全局变量
func incrby() error {

    lockkey := "count_key"
    counterkey := "counter"

    succ, err := rds.SetNX(lockkey, 1, time.Second*time.Duration(5)).Result()
    if err != nil || !succ {
        fmt.Println(err, " lock result:", succ)
        return err
    }

    defer func() {
        succ, err := rds.Del(lockkey).Result()
        if err == nil && succ > 0 {
            fmt.Println("unlock sucess")
        } else {
            fmt.Println("unlock failed, err=", err)
        }
    }()

    resp, err := rds.Get(counterkey).Result()
    if err != nil && err != redis.Nil {
        fmt.Println("get count failed, err=", err)
        return err
    }

    var cnt int64
    if err == nil {
        cnt, err = strconv.ParseInt(resp, 10, 64)
        if err != nil {
            fmt.Println("parse string failed, s=", resp)
            return err
        }
    }
    fmt.Println("curr cnt:", cnt)
    cnt++
    _, err = rds.Set(counterkey, cnt, 0).Result()
    if err != nil {
        fmt.Println("set value fialed,err=", err)
        return err
    }

    return nil
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incrby()
        }()
    }
    wg.Wait()
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

运行结果:

> go run test.go
curr cnt: 0
<nil>  lock result: false
unlock sucess
<nil>  lock result: false
curr cnt: 1
<nil>  lock result: false
unlock sucess
curr cnt: 2
<nil>  lock result: false
unlock sucess
curr cnt: 3
<nil>  lock result: false
<nil>  lock result: false
unlock sucess
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

远程调用setnx运行流程上和单机的trylock非常相似,如果获取锁失败,那么相关的任务逻辑就不会继续向下执行。
setnx很适合在高并发场景下,来争抢一些唯一的资源。

二、基于zookeeper
package main 
import (
    "fmt"
    "sync"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

var zkconn *zk.Conn
var count int64

func incrby() {
    lock := zk.NewLock(zkconn, "/lock", zk.WorldACL(zk.PermAll))
    err := lock.Lock()
    if err != nil {
        panic(err)
    }
    count++
    lock.Unlock()
}

func main() {
    c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second)
    if err != nil {
        fmt.Println("connect zookeeper failed, err=", err)
        return
    }
    zkconn = c
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incrby()
        }()
    }
    wg.Wait()
    fmt.Println(" cnt :", count)
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

运行结果:

$ > go run test.go 
Connected to 127.0.0.1:2181
authenticated: id=72138376348368897, timeout=4000
re-submitting `0` credentials after reconnect
cnt : 10
  • 1
  • 2
  • 3
  • 4
  • 5

基于ZooKeeper的锁与基于Redis锁不同之处在于lock成功之前会一直阻塞,这与sync.Mutex的Lock方法类似。
其原理是基于临时Sequence节点和watch API,例如我们这里使用的是/lock节点。Lock会在该节点下的节点列中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致,一致则说明加锁成功了。
这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照Google的Chubby论文里的阐述,基于强一致协议的锁适用于 粗粒度的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适

三、基于etcd

这个etcd的包"github.com/zieckey/etcdsync"拉取go mod会出现两次问题

#第一次
/etcd imports
        github.com/coreos/etcd/clientv3 tested by
        github.com/coreos/etcd/clientv3.test imports
        github.com/coreos/etcd/auth imports
        github.com/coreos/etcd/mvcc/backend imports
        github.com/coreos/bbolt: github.com/coreos/bbolt@v1.3.5: parsing go.mod:
        module declares its path as: go.etcd.io/bbolt
                but was required as: github.com/coreos/bbolt
#第二次
imports
        google.golang.org/grpc/naming: module google.golang.org/grpc@latest found (v1.32.0), but does not contain package google.golang.org/grpc/naming
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

需要在go.mod中加上

replace (
    github.com/coreos/bbolt v1.3.4 => go.etcd.io/bbolt v1.3.4
    go.etcd.io/bbolt v1.3.4 => github.com/coreos/bbolt v1.3.4
    google.golang.org/grpc => google.golang.org/grpc v1.26.0
)
  • 1
  • 2
  • 3
  • 4
  • 5
import (
    "log"

    "github.com/zieckey/etcdsync"
)

func main() {
    m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
    if m == nil || err != nil {
        log.Printf("etcdsync.New failed")
        return
    }
    err = m.Lock()
    if err != nil {
        log.Println("etcdsync.Lock failed, err=", err)
        return
    }
    log.Printf("etcdsync.Lock OK")
    log.Printf("Get the lock. Do something here.")
    err = m.Unlock()
    if err != nil {
        log.Println("etcdsync.Unlock failed, err=", err)
    } else {
        log.Printf("etcdsync.Unlock OK")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

etcd中没有像ZooKeeper那样的Sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是:

  • 1、先检查/lock路径下是否有值,如果有值,说明锁已经被别人抢了
  • 2、如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其他节点写入过了,那么会导致加锁失败。
  • 3、watch /lock下的事件,此时陷入阻塞
  • 4、当/lock路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(锁过期失效),如果是的话,回到1,走抢锁流程。
如何选择合适的锁
单机量级

业务还在单机就可以搞定的量级下,那么按照需求使用任意的单机锁方案就可以。

分布式量级
  • 量级较低
    如果发展到分布式服务阶段,但业务规模不大,QPS很小的情况下,使用哪种锁的方案都差不多。如果公司内部有可以使用的ZooKeeper、etcd或者Redis集群,那么尽量不引入新的技术栈。
  • 量级较高
    如果锁是在任务恶劣的条件下都不允许数据丢失,那么就不能用Redis的setnx的简单锁。
    对锁数据的可靠性要求极高的话,那只能使用etcd或者ZooKeeper这种通过一致性协议保证数据可靠性的锁方案。(但可靠的背后往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试,以确保分布式锁所使用的的etcd和ZooKeeper集群可以承受得住实际的业务请求压力。
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/776230
推荐阅读
相关标签
  

闽ICP备14008679号