当前位置:   article > 正文

Zookeeper Golang客户端:go-zookeeper的基本使用

Zookeeper Golang客户端:go-zookeeper的基本使用

Zookeeper Golang客户端:go-zookeeper的基本使用

1.连接到ZK server端

var hosts = []string{"localhost:8000"}//server端host
conn, _, err := zk.Connect(hosts, time.Second*5)
defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}
	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2.增删改查

增加节点

var path="/test"
var data=[]byte("hello zk")
var flags=0
//flags有4种取值:
//0:永久,除非手动删除
//zk.FlagEphemeral = 1:短暂,session断开则改节点也被删除
//zk.FlagSequence  = 2:会自动在节点后面添加序号
//3:Ephemeral和Sequence,即,短暂且自动添加序号
var acls=zk.WorldACL(zk.PermAll)//控制访问权限模式

p,err_create:=conn.Create(path,data,flags,acls)
if err_create != nil {
	fmt.Println(err_create)
	return
}
fmt.Println("create:",p)

//输出 create:/test
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

删改与增不同在于其函数中的version参数,其中version是用于 CAS支持
可以通过此种方式保证原子性

func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error)
func (c *Conn) Delete(path string, version int32) error
  • 1
  • 2

3.watch机制

Java API中是通过Watcher实现的,在go-zookeeper中则是通过Event。道理都是一样的
全局监听:
1.调用zk.WithEventCallback(callback)设置回调

//如下:
package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"time"
)

var hosts = []string{"localhost:8000"}

var path1 = "/whatzk"

var flags int32 = zk.FlagEphemeral
var data1 = []byte("hello,this is a zk go test demo!!!")
var acls = zk.WorldACL(zk.PermAll)

func main() {
	option := zk.WithEventCallback(callback)

	conn, _, err := zk.Connect(hosts, time.Second*5, option)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}
	

	_, _, _, err = conn.ExistsW(path1)
	if err != nil {
		fmt.Println(err)
		return
	}

	create(conn, path1, data1)
	
	time.Sleep(time.Second * 2)

	_, _, _, err = conn.ExistsW(path1)
	if err != nil {
		fmt.Println(err)
		return
	}
	delete(conn, path1)


}

func callback(event zk.Event) {
	fmt.Println("*******************")
	fmt.Println("path:", event.Path)
	fmt.Println("type:", event.Type.String())
	fmt.Println("state:", event.State.String())
	fmt.Println("-------------------")
}

func create(conn *zk.Conn, path string, data []byte) {
	_, err_create := conn.Create(path, data, flags, acls)
	if err_create != nil {
		fmt.Println(err_create)
		return
	}
	
}

//输出:
*******************
path: 
type: EventSession
state: StateConnecting
-------------------
*******************
path: 
type: EventSession
state: StateConnected
-------------------
*******************
path: 
type: EventSession
state: StateHasSession
-------------------
*******************
path: /whatzk
type: EventNodeCreated
state: Unknown
-------------------
*******************
path: /whatzk
type: EventNodeDeleted
state: Unknown
-------------------
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

部分监听:
1.调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次
2.开启一个协程处理chanel中传来的event事件
(注意:watchCreataNode一定要放在一个协程中,不能直接在main中调用,不然会阻塞main)

//部分代码如下:
func main() {
	conn, _, err := zk.Connect(hosts, time.Second*5)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}
	

	_, _, ech, err := conn.ExistsW(path1)
	if err != nil {
		fmt.Println(err)
		return
	}
	go watchCreataNode(ech)
	create(conn, path1, data1)

	
}

func watchCreataNode(ech <-chan zk.Event){
	event:=<-ech
	fmt.Println("*******************")
	fmt.Println("path:", event.Path)
	fmt.Println("type:", event.Type.String())
	fmt.Println("state:", event.State.String())
	fmt.Println("-------------------")
}

//输出如下:
*******************
path: /whatyy
type: EventNodeCreated
state: Unknown
-------------------

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

注意:

1.如果即设置了全局监听有设置了部分监听,那么最终是都会触发的,并且全局监听在先执行
2.如果设置了监听子节点,那么事件的触发是先子节点后父节点

4.客户端随机hostname支持

ZK Java client端,相关链接:
(http://www.jianshu.com/p/1068d0896e65)
最终就是Round Robin策略

//使用步骤如下:(相关代码位于dnshostprovider.go中)

var hosts = []string{"host1:8000","host2:8000","host3:8000"}
hostPro:=new(zk.DNSHostProvider)
err:=hostPro.Init(hosts)//先初始化
if err != nil {
	fmt.Println(err)
	return
}
server,retryStart:=hostPro.Next()//获得host
...
hostPro.Connected()  //连接成功后会调用
}


//上面的一系列步骤都集成在func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error)中
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/喵喵爱编程/article/detail/974810
推荐阅读
相关标签
  

闽ICP备14008679号