当前位置:   article > 正文

milvus querynode启动源码分析

milvus querynode启动源码分析

querynode启动源码分析

结构体

// QueryNode implements QueryNode grpc server
// cmd\components\query_node.go
type QueryNode struct {
	ctx context.Context
	svr *grpcquerynode.Server
}

// Server is the grpc server of QueryNode.
type Server struct {
	querynode   types.QueryNodeComponent
	wg          sync.WaitGroup
	ctx         context.Context
	cancel      context.CancelFunc
	grpcErrChan chan error

	serverID atomic.Int64

	grpcServer *grpc.Server

	etcdCli *clientv3.Client
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

querynode是一个接口,实现querynode api功能。

func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
	wg.Add(1)
	// clear local storage
	rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
	queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole)
	cleanLocalDir(queryDataLocalPath)
	// clear mmap dir
	mmapDir := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
	if len(mmapDir) > 0 {
		cleanLocalDir(mmapDir)
	}

	return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode)
}

// creator用NewQueryNode替换
role, err = creator(ctx, factory)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

components.NewQueryNode是一个函数。

NewQueryNode()用来创建QueryNode结构体。

// NewQueryNode creates a new QueryNode
func NewQueryNode(ctx context.Context, factory dependency.Factory) (*QueryNode, error) {
	svr, err := grpcquerynode.NewServer(ctx, factory)
	if err != nil {
		return nil, err
	}

	return &QueryNode{
		ctx: ctx,
		svr: svr,
	}, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

grpcquerynode.NewServer()产生的是本结构体Server。

// NewServer create a new QueryNode grpc server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {
	ctx1, cancel := context.WithCancel(ctx)

	s := &Server{
		ctx:         ctx1,
		cancel:      cancel,
		querynode:   qn.NewQueryNode(ctx, factory),
		grpcErrChan: make(chan error),
	}
	return s, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

qn.NewQueryNode()返回一个结构体,是 types.QueryNodeComponen接口的一个实现

执行Run()

Server结构体创建后,调用结构体的Run()方法。

func runComponent[T component](ctx context.Context,
	localMsg bool,
	runWg *sync.WaitGroup,
	creator func(context.Context, dependency.Factory) (T, error),
	metricRegister func(*prometheus.Registry),
) component {
	var role T

	sign := make(chan struct{})
	go func() {
		factory := dependency.NewFactory(localMsg)
		var err error
		role, err = creator(ctx, factory)
		if localMsg {
			paramtable.SetRole(typeutil.StandaloneRole)
		} else {
			paramtable.SetRole(role.GetName())
		}
		if err != nil {
			panic(err)
		}
		close(sign)
        // 在这里调用对应组件结构体的Run()方法,这里是QueryNode结构体
		if err := role.Run(); err != nil {
			panic(err)
		}
		runWg.Done()
	}()
    ......
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

runComponent是一个包裹函数。

// Run starts service
func (q *QueryNode) Run() error {
	if err := q.svr.Run(); err != nil {
		log.Error("QueryNode starts error", zap.Error(err))
		return err
	}
	log.Debug("QueryNode successfully started")
	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

Run()方法调用q.svr.Run()方法。srv是grpcquerynode.NewServer()返回的结构体。

进入Run()方法:

// Run initializes and starts QueryNode's grpc service.
func (s *Server) Run() error {
	if err := s.init(); err != nil {
		return err
	}
	log.Debug("QueryNode init done ...")

	if err := s.start(); err != nil {
		return err
	}
	log.Debug("QueryNode start done ...")
	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

接下来分析s.init()和s.start()方法。

s.init()

// init initializes QueryNode's grpc service.
func (s *Server) init() error {
	etcdConfig := &paramtable.Get().EtcdCfg
	Params := &paramtable.Get().QueryNodeGrpcServerCfg

	if !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) {
		paramtable.Get().Save(Params.Port.Key, fmt.Sprintf("%d", funcutil.GetAvailablePort()))
		log.Warn("QueryNode get available port when init", zap.Int("Port", Params.Port.GetAsInt()))
	}

	log.Debug("QueryNode", zap.Int("port", Params.Port.GetAsInt()))

	etcdCli, err := etcd.GetEtcdClient(
		etcdConfig.UseEmbedEtcd.GetAsBool(),
		etcdConfig.EtcdUseSSL.GetAsBool(),
		etcdConfig.Endpoints.GetAsStrings(),
		etcdConfig.EtcdTLSCert.GetValue(),
		etcdConfig.EtcdTLSKey.GetValue(),
		etcdConfig.EtcdTLSCACert.GetValue(),
		etcdConfig.EtcdTLSMinVersion.GetValue())
	if err != nil {
		log.Debug("QueryNode connect to etcd failed", zap.Error(err))
		return err
	}
	s.etcdCli = etcdCli
	s.SetEtcdClient(etcdCli)
	s.querynode.SetAddress(Params.GetAddress())
	log.Debug("QueryNode connect to etcd successfully")
	s.wg.Add(1)
    // 启动grpc,默认端口为21123
	go s.startGrpcLoop(Params.Port.GetAsInt())
	// wait for grpc server loop start
	err = <-s.grpcErrChan
	if err != nil {
		return err
	}

	s.querynode.UpdateStateCode(commonpb.StateCode_Initializing)
	log.Debug("QueryNode", zap.Any("State", commonpb.StateCode_Initializing))
    // 调用querynode的初始化方法
	if err := s.querynode.Init(); err != nil {
		log.Error("QueryNode init error: ", zap.Error(err))
		return err
	}

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

s.startGrpcLoop()启动grpc端口服务。

最终调用s.querynode.Init()进行初始化,代码位置:internal\querynodev2\server.go

s.querynode是接口类型types.QueryNodeComponent,QueryNodeComponent继承于Component。

type QueryNodeComponent interface {
    QueryNode
    UpdateStateCode(stateCode commonpb.StateCode)
    SetAddress(address string)
    GetAddress() string
    SetEtcdClient(etcdClient *clientv3.Client)
}

// QueryNode is the interface `querynode` package implements
type QueryNode interface {
	Component
	querypb.QueryNodeServer
}

// Component is the interface all services implement
type Component interface {
	Init() error
	Start() error
	Stop() error
	Register() error
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

接口套接口:

RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

各组件最终的Init()初始化代码路径:

internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

回过头来继续querynode的init。

// Init function init historical and streaming module to manage segments
func (node *QueryNode) Init() error {
	var initError error
	node.initOnce.Do(func() {
		// ctx := context.Background()
		log.Info("QueryNode session info", zap.String("metaPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))
		err := node.initSession()
		if err != nil {
			log.Error("QueryNode init session failed", zap.Error(err))
			initError = err
			return
		}

		err = node.initHook()
		if err != nil {
			// auto index cannot work if hook init failed
			if paramtable.Get().AutoIndexConfig.Enable.GetAsBool() {
				log.Error("QueryNode init hook failed", zap.Error(err))
				initError = err
				return
			}
		}

		node.factory.Init(paramtable.Get())

		localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue()
		localChunkManager := storage.NewLocalChunkManager(storage.RootPath(localRootPath))
		localUsedSize, err := segments.GetLocalUsedSize(localRootPath)
		if err != nil {
			log.Warn("get local used size failed", zap.Error(err))
			initError = err
			return
		}
		metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localUsedSize / 1024 / 1024))
		remoteChunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx)
		if err != nil {
			log.Warn("failed to init remote chunk manager", zap.Error(err))
			initError = err
			return
		}
		node.cacheChunkManager, err = storage.NewVectorChunkManager(node.ctx,
			localChunkManager,
			remoteChunkManager,
			paramtable.Get().QueryNodeCfg.CacheMemoryLimit.GetAsInt64(),
			paramtable.Get().QueryNodeCfg.CacheEnabled.GetAsBool(),
		)
		if err != nil {
			log.Error("failed to init cache chunk manager", zap.Error(err))
			initError = err
			return
		}

		node.vectorStorage, err = node.factory.NewPersistentStorageChunkManager(node.ctx)
		if err != nil {
			log.Error("QueryNode init vector storage failed", zap.Error(err))
			initError = err
			return
		}

		log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue()))

		schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue()
		node.scheduler = tasks.NewScheduler(
			schedulePolicy,
		)
		log.Info("queryNode init scheduler", zap.String("policy", schedulePolicy))

		node.clusterManager = cluster.NewWorkerManager(func(ctx context.Context, nodeID int64) (cluster.Worker, error) {
			if nodeID == paramtable.GetNodeID() {
				return NewLocalWorker(node), nil
			}

			sessions, _, err := node.session.GetSessions(typeutil.QueryNodeRole)
			if err != nil {
				return nil, err
			}

			addr := ""
			for _, session := range sessions {
				if session.ServerID == nodeID {
					addr = session.Address
					break
				}
			}

			client, err := grpcquerynodeclient.NewClient(ctx, addr, nodeID)
			if err != nil {
				return nil, err
			}

			return cluster.NewRemoteWorker(client), nil
		})
		node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]()
		node.subscribingChannels = typeutil.NewConcurrentSet[string]()
		node.unsubscribingChannels = typeutil.NewConcurrentSet[string]()
		node.manager = segments.NewManager()
		node.loader = segments.NewLoader(node.manager, node.vectorStorage)
		node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID())
		// init pipeline manager
		node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators)

		err = node.InitSegcore()
		if err != nil {
			log.Error("QueryNode init segcore failed", zap.Error(err))
			initError = err
			return
		}
		if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() {
			if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() {
				action := func(GOGC uint32) {
					debug.SetGCPercent(int(GOGC))
				}
				gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
			} else {
				action := func(uint32) {}
				gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action)
			}
		}

		log.Info("query node init successfully",
			zap.Int64("queryNodeID", paramtable.GetNodeID()),
			zap.String("Address", node.address),
		)
	})

	return initError
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127

从代码可以看出初始化是在填充QueryNode结构体。

s.start()

启动组件的逻辑。

// start starts QueryNode's grpc service.
func (s *Server) start() error {
	if err := s.querynode.Start(); err != nil {
		log.Error("QueryNode start failed", zap.Error(err))
		return err
	}
	if err := s.querynode.Register(); err != nil {
		log.Error("QueryNode register service failed", zap.Error(err))
		return err
	}
	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

s.querynode是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

Register():向元数据etcd注册。

Start():用来启动组件。

// Start mainly start QueryNode's query service.
func (node *QueryNode) Start() error {
	node.startOnce.Do(func() {
		node.scheduler.Start()

		paramtable.SetCreateTime(time.Now())
		paramtable.SetUpdateTime(time.Now())
		mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()
		mmapEnabled := len(mmapDirPath) > 0
		node.UpdateStateCode(commonpb.StateCode_Healthy)

		registry.GetInMemoryResolver().RegisterQueryNode(paramtable.GetNodeID(), node)
		log.Info("query node start successfully",
			zap.Int64("queryNodeID", paramtable.GetNodeID()),
			zap.String("Address", node.address),
			zap.Bool("mmapEnabled", mmapEnabled),
		)
	})

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

node节点都没有standby,coord节点有standby。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/487327
推荐阅读
相关标签
  

闽ICP备14008679号