赞
踩
客户端服务发现(Client-side Service Discovery)是一种微服务架构中的模式,用于让客户端应用动态地发现并调用其他服务的实例,而无需通过一个中介(例如负载均衡器或服务网关)。它通常用于分布式系统中,通过客户端直接决定并选择与哪个服务实例通信,从而实现服务发现和负载均衡。
服务端服务发现(Server-side Service Discovery)是另一种服务发现模式,与客户端服务发现相对。在这种模式中,服务的实例发现和负载均衡由服务端组件处理,客户端只需将请求发送给一个固定的入口点(如负载均衡器或 API 网关),由这个入口点负责将请求路由到合适的服务实例。
Apache ZooKeeper 是一个用于分布式系统中的协调服务。它提供了一套高效、可靠的分布式协调工具,用于实现服务注册、配置管理、同步、领导者选举等功能。Zookeeper 的设计初衷是简化分布式应用中的协调任务,从而使应用开发更容易。
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
bin/zkServer.sh start
package main import ( "fmt" "github.com/samuel/go-zookeeper/zk" "time" ) var ( host = []string{"127.0.0.1:2181"} ) func main() { //连接客户端 conn, _, err := zk.Connect(host, 5*time.Second) if err != nil { panic(err) } //增 if _, err := conn.Create("/test_tree2", []byte("tree_content"), 0, zk.WorldACL(zk.PermAll)); err != nil { fmt.Println("create err", err) } //查 nodeValue, dStat, err := conn.Get("/test_tree2") if err != nil { fmt.Println("get err", err) return } fmt.Println("nodeValue", string(nodeValue)) //改,需要先查询得到版本号 if _, err := conn.Set("/test_tree2", []byte("new_content"), dStat.Version); err != nil { fmt.Println("update err", err) } //删除,也,需要先查询得到版本号 _, dStat, _ = conn.Get("/test_tree2") if err := conn.Delete("/test_tree2", dStat.Version); err != nil { fmt.Println("Delete err", err) //return } //验证存在 hasNode, _, err := conn.Exists("/test_tree2") if err != nil { fmt.Println("Exists err", err) //return } fmt.Println("node Exist", hasNode) //增加 if _, err := conn.Create("/test_tree2", []byte("tree_content"), 0, zk.WorldACL(zk.PermAll)); err != nil { fmt.Println("create err", err) } //设置子节点,如果上游节点不存在则会报错 if _, err := conn.Create("/test_tree2/subnode", []byte("node_content"), 0, zk.WorldACL(zk.PermAll)); err != nil { fmt.Println("create err", err) } //获取子节点列表 childNodes, _, err := conn.Children("/test_tree2") if err != nil { fmt.Println("Children err", err) } fmt.Println("childNodes", childNodes) }
package main import ( "fmt" "github.com/e421083458/gateway_demo/proxy/zookeeper" "log" "os" "os/signal" "syscall" ) var addr = "127.0.0.1:2002" func main() { //获取zk节点列表 zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"}) zkManager.GetConnect() defer zkManager.Close() // 注册一个节点 err := zkManager.RegistServerPath("/real_server", "127.0.0.1") err = zkManager.RegistServerPath("/real_server/test", "127.0.0.1:8823") err = zkManager.RegistServerPath("/real_server/test2", "127.0.0.1:8823") if err != nil { return } // 获取节点列表 zlist, err := zkManager.GetServerListByPath("/real_server") fmt.Println("server node:") fmt.Println(zlist) if err != nil { log.Println(err) } //动态监听节点变化 chanList, chanErr := zkManager.WatchServerListByPath("/real_server") go func() { for { select { case changeErr := <-chanErr: fmt.Println("changeErr") fmt.Println(changeErr) case changedList := <-chanList: fmt.Println("watch node changed") fmt.Println(changedList) } } }() time.Sleep(time.Second * 5) zkManager.RegistServerPath("/real_server/test3", "127.0.0.2:8888") //关闭信号监听 quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit }
package main import ( "fmt" "github.com/e421083458/gateway_demo/proxy/zookeeper" "log" "os" "os/signal" "syscall" "time" ) var addr = "127.0.0.1:2002" func main() { //获取zk节点列表 zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"}) zkManager.GetConnect() defer zkManager.Close() // 注册一个节点 err := zkManager.RegistServerPath("/rs_server_conf", "192.168.1.101") if err != nil { fmt.Printf("2001:%v \n", err) return } // 获取节点列表 zlist, err := zkManager.GetServerListByPath("/rs_server_conf") fmt.Println("server node:") fmt.Println(zlist) if err != nil { log.Println(err) } 获取节点内容 zc, _, err := zkManager.GetPathData("/rs_server_conf") if err != nil { log.Println(err) } fmt.Println("get node data:") fmt.Println(string(zc)) //动态监听节点内容 dataChan, dataErrChan := zkManager.WatchPathData("/rs_server_conf") go func() { for { select { case changeErr := <-dataErrChan: fmt.Println("changeErr") fmt.Println(changeErr) case changedData := <-dataChan: fmt.Println("WatchGetData changed") fmt.Println(string(changedData)) } } }() // 尝试修改内容 time.Sleep(5 * time.Second) _, z, err := zkManager.GetPathData("/rs_server_conf") if err != nil { return } err = zkManager.SetPathData("/rs_server_conf", []byte(addr), z.Version) if err != nil { fmt.Sprintf("2002:%v \n", err) return } //关闭信号监听 quit := make(chan os.Signal) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit }
观察者模式(Observer Pattern)是一种行为设计模式,它定义了一种一对多的依赖关系,使得多个观察者对象可以同时监听某一个主题对象。当这个主题对象的状态发生变化时,会通知所有观察者对象,使它们能够自动更新。观察者模式常用于实现事件处理系统,如用户界面事件、订阅/发布系统等。
package load_balance import ( "fmt" "github.com/e421083458/gateway_demo/proxy/zookeeper" ) // 配置主题 type LoadBalanceConf interface { Attach(o Observer) GetConf() []string WatchConf() UpdateConf(conf []string) } type LoadBalanceZkConf struct { observers []Observer path string zkHosts []string confIpWeight map[string]string activeList []string format string } func (s *LoadBalanceZkConf) Attach(o Observer) { s.observers = append(s.observers, o) } func (s *LoadBalanceZkConf) NotifyAllObservers() { for _, obs := range s.observers { obs.Update() } } func (s *LoadBalanceZkConf) GetConf() []string { confList := []string{} for _, ip := range s.activeList { weight, ok := s.confIpWeight[ip] if !ok { weight = "50" //默认weight } confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight) } return confList } //更新配置时,通知监听者也更新 func (s *LoadBalanceZkConf) WatchConf() { zkManager := zookeeper.NewZkManager(s.zkHosts) zkManager.GetConnect() fmt.Println("watchConf") chanList, chanErr := zkManager.WatchServerListByPath(s.path) go func() { defer zkManager.Close() for { select { case changeErr := <-chanErr: fmt.Println("changeErr", changeErr) case changedList := <-chanList: fmt.Println("watch node changed") s.UpdateConf(changedList) } } }() } //更新配置时,通知监听者也更新 func (s *LoadBalanceZkConf) UpdateConf(conf []string) { s.activeList = conf for _, obs := range s.observers { obs.Update() } } func NewLoadBalanceZkConf(format, path string, zkHosts []string, conf map[string]string) (*LoadBalanceZkConf, error) { zkManager := zookeeper.NewZkManager(zkHosts) zkManager.GetConnect() defer zkManager.Close() zlist, err := zkManager.GetServerListByPath(path) if err != nil { return nil, err } mConf := &LoadBalanceZkConf{format: format, activeList: zlist, confIpWeight: conf, zkHosts: zkHosts, path: path} mConf.WatchConf() return mConf, nil } type Observer interface { Update() } type LoadBalanceObserver struct { ModuleConf *LoadBalanceZkConf } func (l *LoadBalanceObserver) Update() { fmt.Println("Update get conf:", l.ModuleConf.GetConf()) } func NewLoadBalanceObserver(conf *LoadBalanceZkConf) *LoadBalanceObserver { return &LoadBalanceObserver{ ModuleConf: conf, } }
package main import ( "github.com/e421083458/gateway_demo/proxy/load_balance" "github.com/e421083458/gateway_demo/proxy/middleware" proxy2 "github.com/e421083458/gateway_demo/proxy/proxy" "log" "net/http" ) var ( addr = "127.0.0.1:2002" ) func main() { mConf, err := load_balance.NewLoadBalanceZkConf("http://%s/base", "/real_server", []string{"127.0.0.1:2181"}, map[string]string{"127.0.0.1:2003": "20"}) if err != nil { panic(err) } rb := load_balance.LoadBanlanceFactorWithConf(load_balance.LbWeightRoundRobin, mConf) proxy := proxy2.NewLoadBalanceReverseProxy(&middleware.SliceRouterContext{}, rb) log.Println("Starting httpserver at " + addr) log.Fatal(http.ListenAndServe(addr, proxy)) }
网关主动通过心跳检测区检测客户端的服务
package load_balance import ( "fmt" "net" "reflect" "sort" "time" ) const ( //default check setting DefaultCheckMethod = 0 DefaultCheckTimeout = 2 DefaultCheckMaxErrNum = 2 DefaultCheckInterval = 5 ) type LoadBalanceCheckConf struct { observers []Observer confIpWeight map[string]string activeList []string format string } func (s *LoadBalanceCheckConf) Attach(o Observer) { s.observers = append(s.observers, o) } func (s *LoadBalanceCheckConf) NotifyAllObservers() { for _, obs := range s.observers { obs.Update() } } func (s *LoadBalanceCheckConf) GetConf() []string { confList := []string{} for _, ip := range s.activeList { weight, ok := s.confIpWeight[ip] if !ok { weight = "50" //默认weight } confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight) } return confList } //更新配置时,通知监听者也更新 func (s *LoadBalanceCheckConf) WatchConf() { fmt.Println("watchConf") go func() { confIpErrNum := map[string]int{} for { changedList := []string{} for item, _ := range s.confIpWeight { conn, err := net.DialTimeout("tcp", item, time.Duration(DefaultCheckTimeout)*time.Second) //todo http statuscode if err == nil { conn.Close() if _, ok := confIpErrNum[item]; ok { confIpErrNum[item] = 0 } } if err != nil { if _, ok := confIpErrNum[item]; ok { confIpErrNum[item] += 1 } else { confIpErrNum[item] = 1 } } if confIpErrNum[item] < DefaultCheckMaxErrNum { changedList = append(changedList, item) } } sort.Strings(changedList) sort.Strings(s.activeList) if !reflect.DeepEqual(changedList, s.activeList) { s.UpdateConf(changedList) } time.Sleep(time.Duration(DefaultCheckInterval) * time.Second) } }() } //更新配置时,通知监听者也更新 func (s *LoadBalanceCheckConf) UpdateConf(conf []string) { fmt.Println("UpdateConf", conf) s.activeList = conf for _, obs := range s.observers { obs.Update() } } func NewLoadBalanceCheckConf(format string, conf map[string]string) (*LoadBalanceCheckConf, error) { aList := []string{} //默认初始化 for item, _ := range conf { aList = append(aList, item) } mConf := &LoadBalanceCheckConf{format: format, activeList: aList, confIpWeight: conf} mConf.WatchConf() return mConf, nil }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。