赞
踩
// QueryNode implements QueryNode grpc server // cmd\components\query_node.go type QueryNode struct { ctx context.Context svr *grpcquerynode.Server } // Server is the grpc server of QueryNode. type Server struct { querynode types.QueryNodeComponent wg sync.WaitGroup ctx context.Context cancel context.CancelFunc grpcErrChan chan error serverID atomic.Int64 grpcServer *grpc.Server etcdCli *clientv3.Client }
querynode是一个接口,实现querynode api功能。
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component { wg.Add(1) // clear local storage rootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() queryDataLocalPath := filepath.Join(rootPath, typeutil.QueryNodeRole) cleanLocalDir(queryDataLocalPath) // clear mmap dir mmapDir := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() if len(mmapDir) > 0 { cleanLocalDir(mmapDir) } return runComponent(ctx, localMsg, wg, components.NewQueryNode, metrics.RegisterQueryNode) } // creator用NewQueryNode替换 role, err = creator(ctx, factory)
components.NewQueryNode是一个函数。
NewQueryNode()用来创建QueryNode结构体。
// NewQueryNode creates a new QueryNode
func NewQueryNode(ctx context.Context, factory dependency.Factory) (*QueryNode, error) {
svr, err := grpcquerynode.NewServer(ctx, factory)
if err != nil {
return nil, err
}
return &QueryNode{
ctx: ctx,
svr: svr,
}, nil
}
grpcquerynode.NewServer()产生的是本结构体Server。
// NewServer create a new QueryNode grpc server.
func NewServer(ctx context.Context, factory dependency.Factory) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
s := &Server{
ctx: ctx1,
cancel: cancel,
querynode: qn.NewQueryNode(ctx, factory),
grpcErrChan: make(chan error),
}
return s, nil
}
qn.NewQueryNode()返回一个结构体,是 types.QueryNodeComponen接口的一个实现
Server结构体创建后,调用结构体的Run()方法。
func runComponent[T component](ctx context.Context, localMsg bool, runWg *sync.WaitGroup, creator func(context.Context, dependency.Factory) (T, error), metricRegister func(*prometheus.Registry), ) component { var role T sign := make(chan struct{}) go func() { factory := dependency.NewFactory(localMsg) var err error role, err = creator(ctx, factory) if localMsg { paramtable.SetRole(typeutil.StandaloneRole) } else { paramtable.SetRole(role.GetName()) } if err != nil { panic(err) } close(sign) // 在这里调用对应组件结构体的Run()方法,这里是QueryNode结构体 if err := role.Run(); err != nil { panic(err) } runWg.Done() }() ...... }
runComponent是一个包裹函数。
// Run starts service
func (q *QueryNode) Run() error {
if err := q.svr.Run(); err != nil {
log.Error("QueryNode starts error", zap.Error(err))
return err
}
log.Debug("QueryNode successfully started")
return nil
}
Run()方法调用q.svr.Run()方法。srv是grpcquerynode.NewServer()返回的结构体。
进入Run()方法:
// Run initializes and starts QueryNode's grpc service.
func (s *Server) Run() error {
if err := s.init(); err != nil {
return err
}
log.Debug("QueryNode init done ...")
if err := s.start(); err != nil {
return err
}
log.Debug("QueryNode start done ...")
return nil
}
接下来分析s.init()和s.start()方法。
// init initializes QueryNode's grpc service. func (s *Server) init() error { etcdConfig := ¶mtable.Get().EtcdCfg Params := ¶mtable.Get().QueryNodeGrpcServerCfg if !funcutil.CheckPortAvailable(Params.Port.GetAsInt()) { paramtable.Get().Save(Params.Port.Key, fmt.Sprintf("%d", funcutil.GetAvailablePort())) log.Warn("QueryNode get available port when init", zap.Int("Port", Params.Port.GetAsInt())) } log.Debug("QueryNode", zap.Int("port", Params.Port.GetAsInt())) etcdCli, err := etcd.GetEtcdClient( etcdConfig.UseEmbedEtcd.GetAsBool(), etcdConfig.EtcdUseSSL.GetAsBool(), etcdConfig.Endpoints.GetAsStrings(), etcdConfig.EtcdTLSCert.GetValue(), etcdConfig.EtcdTLSKey.GetValue(), etcdConfig.EtcdTLSCACert.GetValue(), etcdConfig.EtcdTLSMinVersion.GetValue()) if err != nil { log.Debug("QueryNode connect to etcd failed", zap.Error(err)) return err } s.etcdCli = etcdCli s.SetEtcdClient(etcdCli) s.querynode.SetAddress(Params.GetAddress()) log.Debug("QueryNode connect to etcd successfully") s.wg.Add(1) // 启动grpc,默认端口为21123 go s.startGrpcLoop(Params.Port.GetAsInt()) // wait for grpc server loop start err = <-s.grpcErrChan if err != nil { return err } s.querynode.UpdateStateCode(commonpb.StateCode_Initializing) log.Debug("QueryNode", zap.Any("State", commonpb.StateCode_Initializing)) // 调用querynode的初始化方法 if err := s.querynode.Init(); err != nil { log.Error("QueryNode init error: ", zap.Error(err)) return err } return nil }
这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。
s.startGrpcLoop()启动grpc端口服务。
最终调用s.querynode.Init()进行初始化,代码位置:internal\querynodev2\server.go
s.querynode是接口类型types.QueryNodeComponent,QueryNodeComponent继承于Component。
type QueryNodeComponent interface { QueryNode UpdateStateCode(stateCode commonpb.StateCode) SetAddress(address string) GetAddress() string SetEtcdClient(etcdClient *clientv3.Client) } // QueryNode is the interface `querynode` package implements type QueryNode interface { Component querypb.QueryNodeServer } // Component is the interface all services implement type Component interface { Init() error Start() error Stop() error Register() error }
接口套接口:
RootCoordComponent -> RootCoord -> Component
DataCoordComponent -> DataCoord -> Component
QueryCoordComponent -> QueryCoord -> Component
ProxyComponent -> Proxy -> Component
QueryNodeComponent -> QueryNode -> Component
IndexNodeComponent -> IndexNode -> Component
DataNodeComponent -> DataNode -> Component
各组件最终的Init()初始化代码路径:
internal\rootcoord\root_coord.go->Init()
internal\datacoord\server.go->Init()
internal\querycoordv2\server.go->Init()
internal\datanode\data_node.go->Init()
internal\indexnode\indexnode.go->Init()
internal\querynodev2\server.go->Init()
internal\proxy\proxy.go->Init()
回过头来继续querynode的init。
// Init function init historical and streaming module to manage segments func (node *QueryNode) Init() error { var initError error node.initOnce.Do(func() { // ctx := context.Background() log.Info("QueryNode session info", zap.String("metaPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue())) err := node.initSession() if err != nil { log.Error("QueryNode init session failed", zap.Error(err)) initError = err return } err = node.initHook() if err != nil { // auto index cannot work if hook init failed if paramtable.Get().AutoIndexConfig.Enable.GetAsBool() { log.Error("QueryNode init hook failed", zap.Error(err)) initError = err return } } node.factory.Init(paramtable.Get()) localRootPath := paramtable.Get().LocalStorageCfg.Path.GetValue() localChunkManager := storage.NewLocalChunkManager(storage.RootPath(localRootPath)) localUsedSize, err := segments.GetLocalUsedSize(localRootPath) if err != nil { log.Warn("get local used size failed", zap.Error(err)) initError = err return } metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localUsedSize / 1024 / 1024)) remoteChunkManager, err := node.factory.NewPersistentStorageChunkManager(node.ctx) if err != nil { log.Warn("failed to init remote chunk manager", zap.Error(err)) initError = err return } node.cacheChunkManager, err = storage.NewVectorChunkManager(node.ctx, localChunkManager, remoteChunkManager, paramtable.Get().QueryNodeCfg.CacheMemoryLimit.GetAsInt64(), paramtable.Get().QueryNodeCfg.CacheEnabled.GetAsBool(), ) if err != nil { log.Error("failed to init cache chunk manager", zap.Error(err)) initError = err return } node.vectorStorage, err = node.factory.NewPersistentStorageChunkManager(node.ctx) if err != nil { log.Error("QueryNode init vector storage failed", zap.Error(err)) initError = err return } log.Info("queryNode try to connect etcd success", zap.String("MetaRootPath", paramtable.Get().EtcdCfg.MetaRootPath.GetValue())) schedulePolicy := paramtable.Get().QueryNodeCfg.SchedulePolicyName.GetValue() node.scheduler = tasks.NewScheduler( schedulePolicy, ) log.Info("queryNode init scheduler", zap.String("policy", schedulePolicy)) node.clusterManager = cluster.NewWorkerManager(func(ctx context.Context, nodeID int64) (cluster.Worker, error) { if nodeID == paramtable.GetNodeID() { return NewLocalWorker(node), nil } sessions, _, err := node.session.GetSessions(typeutil.QueryNodeRole) if err != nil { return nil, err } addr := "" for _, session := range sessions { if session.ServerID == nodeID { addr = session.Address break } } client, err := grpcquerynodeclient.NewClient(ctx, addr, nodeID) if err != nil { return nil, err } return cluster.NewRemoteWorker(client), nil }) node.delegators = typeutil.NewConcurrentMap[string, delegator.ShardDelegator]() node.subscribingChannels = typeutil.NewConcurrentSet[string]() node.unsubscribingChannels = typeutil.NewConcurrentSet[string]() node.manager = segments.NewManager() node.loader = segments.NewLoader(node.manager, node.vectorStorage) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, paramtable.GetNodeID()) // init pipeline manager node.pipelineManager = pipeline.NewManager(node.manager, node.tSafeManager, node.dispClient, node.delegators) err = node.InitSegcore() if err != nil { log.Error("QueryNode init segcore failed", zap.Error(err)) initError = err return } if paramtable.Get().QueryNodeCfg.GCEnabled.GetAsBool() { if paramtable.Get().QueryNodeCfg.GCHelperEnabled.GetAsBool() { action := func(GOGC uint32) { debug.SetGCPercent(int(GOGC)) } gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action) } else { action := func(uint32) {} gc.NewTuner(paramtable.Get().QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat(), uint32(paramtable.Get().QueryNodeCfg.MinimumGOGCConfig.GetAsInt()), uint32(paramtable.Get().QueryNodeCfg.MaximumGOGCConfig.GetAsInt()), action) } } log.Info("query node init successfully", zap.Int64("queryNodeID", paramtable.GetNodeID()), zap.String("Address", node.address), ) }) return initError }
从代码可以看出初始化是在填充QueryNode结构体。
启动组件的逻辑。
// start starts QueryNode's grpc service.
func (s *Server) start() error {
if err := s.querynode.Start(); err != nil {
log.Error("QueryNode start failed", zap.Error(err))
return err
}
if err := s.querynode.Register(); err != nil {
log.Error("QueryNode register service failed", zap.Error(err))
return err
}
return nil
}
s.querynode是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。
Register():向元数据etcd注册。
Start():用来启动组件。
// Start mainly start QueryNode's query service. func (node *QueryNode) Start() error { node.startOnce.Do(func() { node.scheduler.Start() paramtable.SetCreateTime(time.Now()) paramtable.SetUpdateTime(time.Now()) mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() mmapEnabled := len(mmapDirPath) > 0 node.UpdateStateCode(commonpb.StateCode_Healthy) registry.GetInMemoryResolver().RegisterQueryNode(paramtable.GetNodeID(), node) log.Info("query node start successfully", zap.Int64("queryNodeID", paramtable.GetNodeID()), zap.String("Address", node.address), zap.Bool("mmapEnabled", mmapEnabled), ) }) return nil }
node节点都没有standby,coord节点有standby。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。