当前位置:   article > 正文

【微服务网关——服务发现】_服务发现服务器

服务发现服务器

1.服务发现

1.1 介绍

  • 服务发现是指用注册中心来记录服务信息,以便其他服务快速查找已注册服务
  • 服务发现分类:
    • 客户端服务发现
    • 服务端服务发现

1.2 客户端服务发现

客户端服务发现(Client-side Service Discovery)是一种微服务架构中的模式,用于让客户端应用动态地发现并调用其他服务的实例,而无需通过一个中介(例如负载均衡器或服务网关)。它通常用于分布式系统中,通过客户端直接决定并选择与哪个服务实例通信,从而实现服务发现和负载均衡。
在这里插入图片描述

1.3 服务端服务发现

服务端服务发现(Server-side Service Discovery)是另一种服务发现模式,与客户端服务发现相对。在这种模式中,服务的实例发现和负载均衡由服务端组件处理,客户端只需将请求发送给一个固定的入口点(如负载均衡器或 API 网关),由这个入口点负责将请求路由到合适的服务实例。
在这里插入图片描述

2.zookeeper

2.1 zookeeper介绍

Apache ZooKeeper 是一个用于分布式系统中的协调服务。它提供了一套高效、可靠的分布式协调工具,用于实现服务注册、配置管理、同步、领导者选举等功能。Zookeeper 的设计初衷是简化分布式应用中的协调任务,从而使应用开发更容易。

  • 是一个分布式数据库(程序协调服务),Hadoop子项目
  • 树状方式维护节点方数据的增、删、改、查
  • 监听通知机制:通过监听可以获取相应消息事件(内容,子节点)

2.2 zookeeper安装

安装zookeeper

  • 参考官方文档安装
    • http://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html
    • 下载时需要注意下载的是编译过的二进制文件,不是源码
    • 不然会爆错:找不到或无法加载主类 org.apache.zookeeper.server.quorum.QuorumPeerMain
  • 解压缩
  • 编辑 conf/zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
  • 1
  • 2
  • 3
  • 运行 bin/zkServer.sh start

2.3 zookeeper核心功能

在这里插入图片描述

  • 持久节点
    • 一直存在服务器上
  • 临时节点
    • 会话失效,节点自动清理
  • 顺序节点
    • 节点创建,自动分配序列号

2.3.1 增删改查API

package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"time"
)

var (
	host = []string{"127.0.0.1:2181"}
)

func main() {
	//连接客户端
	conn, _, err := zk.Connect(host, 5*time.Second)
	if err != nil {
		panic(err)
	}

	//增
	if _, err := conn.Create("/test_tree2", []byte("tree_content"),
		0, zk.WorldACL(zk.PermAll)); err != nil {
		fmt.Println("create err", err)
	}

	//查
	nodeValue, dStat, err := conn.Get("/test_tree2")
	if err != nil {
		fmt.Println("get err", err)
		return
	}
	fmt.Println("nodeValue", string(nodeValue))

	//改,需要先查询得到版本号
	if _, err := conn.Set("/test_tree2", []byte("new_content"),
		dStat.Version); err != nil {
		fmt.Println("update err", err)
	}

	//删除,也,需要先查询得到版本号
	_, dStat, _ = conn.Get("/test_tree2")
	if err := conn.Delete("/test_tree2", dStat.Version); err != nil {
		fmt.Println("Delete err", err)
		//return
	}

	//验证存在
	hasNode, _, err := conn.Exists("/test_tree2")
	if err != nil {
		fmt.Println("Exists err", err)
		//return
	}
	fmt.Println("node Exist", hasNode)

	//增加
	if _, err := conn.Create("/test_tree2", []byte("tree_content"),
		0, zk.WorldACL(zk.PermAll)); err != nil {
		fmt.Println("create err", err)
	}

	//设置子节点,如果上游节点不存在则会报错
	if _, err := conn.Create("/test_tree2/subnode", []byte("node_content"),
		0, zk.WorldACL(zk.PermAll)); err != nil {
		fmt.Println("create err", err)
	}

	//获取子节点列表
	childNodes, _, err := conn.Children("/test_tree2")
	if err != nil {
		fmt.Println("Children err", err)
	}
	fmt.Println("childNodes", childNodes)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73

2.3.2 监听子节点变化

package main

import (
	"fmt"
	"github.com/e421083458/gateway_demo/proxy/zookeeper"
	"log"
	"os"
	"os/signal"
	"syscall"
)

var addr = "127.0.0.1:2002"

func main() {
	//获取zk节点列表
	zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
	zkManager.GetConnect()
	defer zkManager.Close()
	// 注册一个节点
	err := zkManager.RegistServerPath("/real_server", "127.0.0.1")
	err = zkManager.RegistServerPath("/real_server/test", "127.0.0.1:8823")
	err = zkManager.RegistServerPath("/real_server/test2", "127.0.0.1:8823")
	if err != nil {
		return
	}
	// 获取节点列表
	zlist, err := zkManager.GetServerListByPath("/real_server")
	fmt.Println("server node:")
	fmt.Println(zlist)
	if err != nil {
		log.Println(err)
	}

	//动态监听节点变化
	chanList, chanErr := zkManager.WatchServerListByPath("/real_server")
	go func() {
		for {
			select {
			case changeErr := <-chanErr:
				fmt.Println("changeErr")
				fmt.Println(changeErr)
			case changedList := <-chanList:
				fmt.Println("watch node changed")
				fmt.Println(changedList)
			}
		}
	}()
	time.Sleep(time.Second * 5)
	zkManager.RegistServerPath("/real_server/test3", "127.0.0.2:8888")
	
	//关闭信号监听
	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

在这里插入图片描述

2.3.3 监听节点内容变化

package main

import (
	"fmt"
	"github.com/e421083458/gateway_demo/proxy/zookeeper"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

var addr = "127.0.0.1:2002"
func main() {
	//获取zk节点列表
	zkManager := zookeeper.NewZkManager([]string{"127.0.0.1:2181"})
	zkManager.GetConnect()
	defer zkManager.Close()
	// 注册一个节点
	err := zkManager.RegistServerPath("/rs_server_conf", "192.168.1.101")
	if err != nil {
		fmt.Printf("2001:%v \n", err)
		return
	}
	// 获取节点列表
	zlist, err := zkManager.GetServerListByPath("/rs_server_conf")
	fmt.Println("server node:")
	fmt.Println(zlist)
	if err != nil {
		log.Println(err)
	}
	获取节点内容
	zc, _, err := zkManager.GetPathData("/rs_server_conf")
	if err != nil {
		log.Println(err)
	}
	fmt.Println("get node data:")
	fmt.Println(string(zc))

	//动态监听节点内容
	dataChan, dataErrChan := zkManager.WatchPathData("/rs_server_conf")
	go func() {
		for {
			select {
			case changeErr := <-dataErrChan:
				fmt.Println("changeErr")
				fmt.Println(changeErr)
			case changedData := <-dataChan:
				fmt.Println("WatchGetData changed")
				fmt.Println(string(changedData))
			}
		}
	}()
	// 尝试修改内容
	time.Sleep(5 * time.Second)
	_, z, err := zkManager.GetPathData("/rs_server_conf")
	if err != nil {
		return
	}
	err = zkManager.SetPathData("/rs_server_conf", []byte(addr), z.Version)
	if err != nil {
		fmt.Sprintf("2002:%v \n", err)
		return
	}
	//关闭信号监听
	quit := make(chan os.Signal)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

在这里插入图片描述

3.网关实现服务发现原理

3.1 网关实现客户端服务发现

在这里插入图片描述

3.2 网关实现服务端服务发现

在这里插入图片描述

  • 将服务注册到zookeeper中
  • 网关通过监听zookeeper中的事件来感知变化

4.网关拓展服务发现

  • 下游机器启动时创建临时节点:节点名与内容为服务地址
  • 以观察者模式构建负载均衡配置LoadBalanceConf
  • 负载均衡配置LoadBalanceConf与负载均衡器整合

4.1 观察者模式

观察者模式(Observer Pattern)是一种行为设计模式,它定义了一种一对多的依赖关系,使得多个观察者对象可以同时监听某一个主题对象。当这个主题对象的状态发生变化时,会通知所有观察者对象,使它们能够自动更新。观察者模式常用于实现事件处理系统,如用户界面事件、订阅/发布系统等。

  • 关键概念
  • 主题(Subject):也称为发布者(Publisher),它维护一组观察者对象,并提供注册和移除观察者的方法。当主题的状态发生变化时,会通知所有观察者。
  • 观察者(Observer):也称为订阅者(Subscriber),它定义了一个更新接口,用于接收来自主题的通知。每个观察者在接收到通知后,可以执行特定的操作。
  • 通知(Notification):指的是主题状态变化时向观察者发送的信号或消息。

4.2 以观察者模式构建负载均衡配置

在这里插入图片描述

package load_balance

import (
	"fmt"
	"github.com/e421083458/gateway_demo/proxy/zookeeper"
)

// 配置主题
type LoadBalanceConf interface {
	Attach(o Observer)
	GetConf() []string
	WatchConf()
	UpdateConf(conf []string)
}

type LoadBalanceZkConf struct {
	observers    []Observer
	path         string
	zkHosts      []string
	confIpWeight map[string]string
	activeList   []string
	format       string
}

func (s *LoadBalanceZkConf) Attach(o Observer) {
	s.observers = append(s.observers, o)
}

func (s *LoadBalanceZkConf) NotifyAllObservers() {
	for _, obs := range s.observers {
		obs.Update()
	}
}

func (s *LoadBalanceZkConf) GetConf() []string {
	confList := []string{}
	for _, ip := range s.activeList {
		weight, ok := s.confIpWeight[ip]
		if !ok {
			weight = "50" //默认weight
		}
		confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight)
	}
	return confList
}

//更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) WatchConf() {
	zkManager := zookeeper.NewZkManager(s.zkHosts)
	zkManager.GetConnect()
	fmt.Println("watchConf")
	chanList, chanErr := zkManager.WatchServerListByPath(s.path)
	go func() {
		defer zkManager.Close()
		for {
			select {
			case changeErr := <-chanErr:
				fmt.Println("changeErr", changeErr)
			case changedList := <-chanList:
				fmt.Println("watch node changed")
				s.UpdateConf(changedList)
			}
		}
	}()
}

//更新配置时,通知监听者也更新
func (s *LoadBalanceZkConf) UpdateConf(conf []string) {
	s.activeList = conf
	for _, obs := range s.observers {
		obs.Update()
	}
}

func NewLoadBalanceZkConf(format, path string, zkHosts []string, conf map[string]string) (*LoadBalanceZkConf, error) {
	zkManager := zookeeper.NewZkManager(zkHosts)
	zkManager.GetConnect()
	defer zkManager.Close()
	zlist, err := zkManager.GetServerListByPath(path)
	if err != nil {
		return nil, err
	}
	mConf := &LoadBalanceZkConf{format: format, activeList: zlist, confIpWeight: conf, zkHosts: zkHosts, path: path}
	mConf.WatchConf()
	return mConf, nil
}

type Observer interface {
	Update()
}

type LoadBalanceObserver struct {
	ModuleConf *LoadBalanceZkConf
}

func (l *LoadBalanceObserver) Update() {
	fmt.Println("Update get conf:", l.ModuleConf.GetConf())
}

func NewLoadBalanceObserver(conf *LoadBalanceZkConf) *LoadBalanceObserver {
	return &LoadBalanceObserver{
		ModuleConf: conf,
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105

4.3 负载均衡配置LoadBalanceConf与负载均衡器整合

package main

import (
	"github.com/e421083458/gateway_demo/proxy/load_balance"
	"github.com/e421083458/gateway_demo/proxy/middleware"
	proxy2 "github.com/e421083458/gateway_demo/proxy/proxy"
	"log"
	"net/http"
)

var (
	addr = "127.0.0.1:2002"
)

func main() {
	mConf, err := load_balance.NewLoadBalanceZkConf("http://%s/base",
		"/real_server",
		[]string{"127.0.0.1:2181"},
		map[string]string{"127.0.0.1:2003": "20"})
	if err != nil {
		panic(err)
	}
	rb := load_balance.LoadBanlanceFactorWithConf(load_balance.LbWeightRoundRobin, mConf)
	proxy := proxy2.NewLoadBalanceReverseProxy(&middleware.SliceRouterContext{}, rb)
	log.Println("Starting httpserver at " + addr)
	log.Fatal(http.ListenAndServe(addr, proxy))
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

4.4 客户端服务发现实现

网关主动通过心跳检测区检测客户端的服务
在这里插入图片描述

  • 下游机器启动时无需进行任何操作
  • 以观察者模式构建负载均衡配置LoadBalanceConf
  • 负载均衡配置固定时间频率监测下游节点健康状况
package load_balance

import (
	"fmt"
	"net"
	"reflect"
	"sort"
	"time"
)

const (
	//default check setting
	DefaultCheckMethod    = 0
	DefaultCheckTimeout   = 2
	DefaultCheckMaxErrNum = 2
	DefaultCheckInterval  = 5
)

type LoadBalanceCheckConf struct {
	observers    []Observer
	confIpWeight map[string]string
	activeList   []string
	format       string
}

func (s *LoadBalanceCheckConf) Attach(o Observer) {
	s.observers = append(s.observers, o)
}

func (s *LoadBalanceCheckConf) NotifyAllObservers() {
	for _, obs := range s.observers {
		obs.Update()
	}
}

func (s *LoadBalanceCheckConf) GetConf() []string {
	confList := []string{}
	for _, ip := range s.activeList {
		weight, ok := s.confIpWeight[ip]
		if !ok {
			weight = "50" //默认weight
		}
		confList = append(confList, fmt.Sprintf(s.format, ip)+","+weight)
	}
	return confList
}

//更新配置时,通知监听者也更新
func (s *LoadBalanceCheckConf) WatchConf() {
	fmt.Println("watchConf")
	go func() {
		confIpErrNum := map[string]int{}
		for {
			changedList := []string{}
			for item, _ := range s.confIpWeight {
				conn, err := net.DialTimeout("tcp", item, time.Duration(DefaultCheckTimeout)*time.Second)
				//todo http statuscode
				if err == nil {
					conn.Close()
					if _, ok := confIpErrNum[item]; ok {
						confIpErrNum[item] = 0
					}
				}
				if err != nil {
					if _, ok := confIpErrNum[item]; ok {
						confIpErrNum[item] += 1
					} else {
						confIpErrNum[item] = 1
					}
				}
				if confIpErrNum[item] < DefaultCheckMaxErrNum {
					changedList = append(changedList, item)
				}
			}
			sort.Strings(changedList)
			sort.Strings(s.activeList)
			if !reflect.DeepEqual(changedList, s.activeList) {
				s.UpdateConf(changedList)
			}
			time.Sleep(time.Duration(DefaultCheckInterval) * time.Second)
		}
	}()
}

//更新配置时,通知监听者也更新
func (s *LoadBalanceCheckConf) UpdateConf(conf []string) {
	fmt.Println("UpdateConf", conf)
	s.activeList = conf
	for _, obs := range s.observers {
		obs.Update()
	}
}

func NewLoadBalanceCheckConf(format string, conf map[string]string) (*LoadBalanceCheckConf, error) {
	aList := []string{}
	//默认初始化
	for item, _ := range conf {
		aList = append(aList, item)
	}
	mConf := &LoadBalanceCheckConf{format: format, activeList: aList, confIpWeight: conf}
	mConf.WatchConf()
	return mConf, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/804842
推荐阅读
相关标签
  

闽ICP备14008679号