赞
踩
kubernetes基础组件的入口均在cmd目录下,kube-schduler入口在scheduler.go下。
kubernetes所有的组件启动采用的均是command的形式,引用的是spf13类库。
- func main() {
- rand.Seed(time.Now().UnixNano())
-
- //创建Cobra格式的Scheduler command
- command := app.NewSchedulerCommand()
-
- // TODO: once we switch everything over to Cobra commands, we can go back to calling
- // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
- // normalize func and add the go flag set by hand.
-
- //将配置中的‘_’字符转化为‘-’字符
- pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
- // utilflag.InitFlags()
- logs.InitLogs()
- defer logs.FlushLogs()
-
- //执行Scheduler command
- if err := command.Execute(); err != nil {
- fmt.Fprintf(os.Stderr, "%v\n", err)
- os.Exit(1)
- }
- }
通过将配置文件转化成command的形式,调用Execute方法执行定义的Run方法
- Run: func(cmd *cobra.Command, args []string) {
- if err := runCommand(cmd, args, opts); err != nil {
- fmt.Fprintf(os.Stderr, "%v\n", err)
- os.Exit(1)
- }
- },
进入runCommand方法,通过完成配置的初始化,调用Run方法,进一步启动。
- // runCommand runs the scheduler.func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
- 。。。
- // Get the completed config
- cc := c.Complete()
-
- // To help debugging, immediately log version
- klog.Infof("Version: %+v", version.Get())
-
- // Apply algorithms based on feature gates.
- // TODO: make configurable?
- algorithmprovider.ApplyFeatureGates()
-
- // Configz registration.
- if cz, err := configz.New("componentconfig"); err == nil {
- cz.Set(cc.ComponentConfig)
- } else {
- return fmt.Errorf("unable to register configz: %s", err)
- }
-
- return Run(cc, stopCh)
- }
Run方法主要做了以下工作:
1、判断是否需要添加VolumeScheduling新特性;
2、初始化调度参数的相关结构体;
3、配置准备事件广播;
4、健康检查相关配置;
5、Metrics相关配置;
6、启动所有的Informer(kubernetes主要就是通过Informer和Workqueue机制监听事件的变化);
7、判断是否需要LeaderElection,决定最终的启动
Run(cc, stopCh)->sched.Run()->sched.scheduleOne
cheduleOne,顾名思义,每次调度一个Pod,整体文件如
- func (sched *Scheduler) scheduleOne() {
- pod := sched.config.NextPod()
- // pod could be nil when schedulerQueue is closed
- if pod == nil {
- return
- }
- if pod.DeletionTimestamp != nil {
- sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
- klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
- return
- }
-
- klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
-
- // Synchronously attempt to find a fit for the pod.
- start := time.Now()
- // ljs:调度算法:最终的调度在generic_scheduler.go的Schedule方法
- // ljs:schedule()可能已经失败,因为pod不适合任何主机,
- // 因此我们尝试抢占,期望下次尝试pod进行调度时,由于抢占,它将适合。也可以安排不同的pod进入被抢占的资源,但这是无害的。
- suggestedHost, err := sched.schedule(pod)
- if err != nil {
- // ljs:当通过正常的调度流程如果没有找到合适的节点(主要是预选没有合适的节点),
- // 会判断需不需要进行抢占调度,具体的代码在pkg/scheduler/scheduler.go文件下,用到的方法preempt
- if fitError, ok := err.(*core.FitError); ok {
- preemptionStartTime := time.Now()
-
- sched.preempt(pod, fitError)
- metrics.PreemptionAttempts.Inc()
- metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
- metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
-
- metrics.PodScheduleFailures.Inc()
- } else {
- klog.Errorf("error selecting node for pod: %v", err)
- metrics.PodScheduleErrors.Inc()
- }
- return
- }
- metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
-
- // 3.Pod与Node缓存,保证调度一直进行,不用等待每次绑定完成(绑定是一个耗时的过程)
- assumedPod := pod.DeepCopy()
-
- // ljs: 一个Pod被计划调度到机器A的事实被称为assume调度,即假定调度,
- // 这些调度安排被保存在特定的队列里,此时调度过程是能看到这个预安排的,因而影响到其他Pod的调度。
- allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
- if err != nil {
- klog.Errorf("error assuming volumes: %v", err)
- metrics.PodScheduleErrors.Inc()
- return
- }
-
- // assume modifies `assumedPod` by setting NodeName=suggestedHost //ljs: 5. Pod对应的NodeName写上主机名,存入缓存
- err = sched.assume(assumedPod, suggestedHost)
- if err != nil {
- klog.Errorf("error assuming pod: %v", err)
- metrics.PodScheduleErrors.Inc()
- return
- }
- // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
- //ljs: 6. 请求apiserver,异步处理最终的绑定,写入到etcd
- go func() {
- // Bind volumes first before Pod
- if !allBound {
- err := sched.bindVolumes(assumedPod)
- if err != nil {
- klog.Errorf("error binding volumes: %v", err)
- metrics.PodScheduleErrors.Inc()
- return
- }
- }
-
- err := sched.bind(assumedPod, &v1.Binding{
- ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
- Target: v1.ObjectReference{
- Kind: "Node",
- Name: suggestedHost,
- },
- })
- metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
- if err != nil {
- klog.Errorf("error binding pod: %v", err)
- metrics.PodScheduleErrors.Inc()
- } else {
- metrics.PodScheduleSuccesses.Inc()
- }
- }()
- }
主要做了以下工作:
1、从队列中取出待调度的Pod
2、根据调度算法(预选+优选)获取待调度Pod匹配的主机,如果未获取到合适的主机,判断是否需要preempt,即Pod的抢占策略,为Pod分配节点
3、将当前Pod缓存起来,假定已经绑定成功(主要是为了将scheduling与binding过程分开)
4、判断是否需要VolumeScheduling特性继续添加Pod信息
5、Pod对应的NodeName写上主机名(调度的本质就是将为空的NodeName写上相应的Node的值)
6、启动新的binding协程,请求apiserver,异步处理最终的绑定,将结果写入到etcd中
sched.scheduleOne -> sched.schedule(pod) -> generic_scheduler.go
- func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
- trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
- defer trace.LogIfLong(100 * time.Millisecond)
- // 对pod做一些基础检查,及检查pod对应的pvc
- if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
- return "", err
- }
- //取得node list列表
- nodes, err := nodeLister.List()
- if err != nil {
- return "", err
- }
- ...
-
- trace.Step("Computing predicates")
- startPredicateEvalTime := time.Now()
- //ljs:调度算法预选
- filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
- if err != nil {
- return "", err
- }
- ...
-
- metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
- //优选算法调用的接口,执行PrioritizeNodes方法对通过预选的node进行优选打分
- priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) //ljs:优选算法
- if err != nil {
- return "", err
- }
- metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
- metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
-
- trace.Step("Selecting host")
- //最后找出一个优选分数最高的node,如果有node优选分数一样,则随机返回一个分数最高的node
- return g.selectHost(priorityList)
- }
如果未自定义调度器,则启用默认的调度器genericScheduler,genericScheduler的Schedule方法如下:
预选算法调用的接口是findNodesThatFit,主要代码如下:
- func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
- var filtered []*v1.Node
- failedPredicateMap := FailedPredicateMap{}
-
- // 该if表示,如果没有配置预选的算法,则直接将所有的Node写入匹配数组
- if len(g.predicates) == 0 {
- filtered = nodes
- } else {
- allNodes := int32(g.cache.NodeTree().NumNodes)
- // numFeasibleNodesToFind保证一次性不用返回过多的Node数量,避免数组过大
- numNodesToFind := g.numFeasibleNodesToFind(allNodes)
-
- // Create filtered list with enough space to avoid growing it
- // and allow assigning.
- filtered = make([]*v1.Node, numNodesToFind)
- errs := errors.MessageCountMap{}
- var (
- predicateResultLock sync.Mutex
- filteredLen int32
- equivClass *equivalence.Class
- )
-
- ctx, cancel := context.WithCancel(context.Background())
-
- // We can use the same metadata producer for all nodes.
- meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
-
- if g.equivalenceCache != nil {
- // getEquivalenceClassInfo will return immediately if no equivalence pod found
- equivClass = equivalence.NewClass(pod)
- }
-
- // checkNode处理预选策略
- checkNode := func(i int) {
- var nodeCache *equivalence.NodeCache
- // 每次获取Node信息
- nodeName := g.cache.NodeTree().Next()
- if g.equivalenceCache != nil {
- nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
- }
- // 最终实现调度判断的接口
- fits, failedPredicates, err := podFitsOnNode(
- pod,
- meta,
- g.cachedNodeInfoMap[nodeName],
- g.predicates,
- nodeCache,
- g.schedulingQueue,
- g.alwaysCheckAllPredicates,
- equivClass,
- )
- if err != nil {
- predicateResultLock.Lock()
- errs[err.Error()]++
- predicateResultLock.Unlock()
- return
- }
- if fits {
- // 保证获取的Node数量在numNodesToFind内
- length := atomic.AddInt32(&filteredLen, 1)
- if length > numNodesToFind {
- // 通知ParallelizeUntil任务结束
- cancel()
- atomic.AddInt32(&filteredLen, -1)
- } else {
- filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
- }
- } else {
- predicateResultLock.Lock()
- failedPredicateMap[nodeName] = failedPredicates
- predicateResultLock.Unlock()
- }
- }
-
- // Stops searching for more nodes once the configured number of feasible nodes
- // are found.
- // 并行处理多个Node的checkNode工作
- workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
-
- filtered = filtered[:filteredLen]
- if len(errs) > 0 {
- return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
- }
- }
-
- //ljs:如果配置了调度的扩展算法,需要继续对筛选后的Pod与Node进行再一次的筛选,获取最终匹配的Node列表。
- if len(filtered) > 0 && len(g.extenders) != 0 {
- for _, extender := range g.extenders {
- if !extender.IsInterested(pod) {
- continue
- }
- filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
- if err != nil {
- if extender.IsIgnorable() {
- klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
- extender, err)
- continue
- } else {
- return []*v1.Node{}, FailedPredicateMap{}, err
- }
- }
-
- for failedNodeName, failedMsg := range failedMap {
- if _, found := failedPredicateMap[failedNodeName]; !found {
- failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
- }
- failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
- }
- filtered = filteredList
- if len(filtered) == 0 {
- break
- }
- }
- }
- return filtered, failedPredicateMap, nil
- }
findNodesThatFit主要做了几个操作
1、判断是否配置了预选算法,如果没有,直接返回Node列表信息;
2、如果配置了预选算法,则同时对多个Node(最多一次16个)调用checkNode方法,判断Pod是否可以调度在该Node上;
3、预选筛选之后,如果配置了调度的扩展算法,需要继续对筛选后的Pod与Node进行再一次的筛选,获取最终匹配的Node列表。
这里有一个注意的地方,获取匹配的Node节点数量时,通过numFeasibleNodesToFind函数限制了每次获取的节点数,最大值为100。这样当匹配到相应的Node数时,checkNode方法不再调用。
这里个人觉着有些问题,当Node数量足够多的时候(大于100),由于numFeasibleNodesToFind限制了Node数量,导致并不能扫描到所有的Node,这样可能导致最合适的Node没有被扫描到,匹配到的只是较优先的Node,则最终调度到的Node也不是最合适的Node,只是相较于比较合适。
预选调度实际接口:podFitsOnNode
最终实现调度判断的接口是podFitsOnNode。
这里的逻辑是从一个for循环开始的,关于这个2次循环的含义代码里有很长的一段注释,我们先看一下注释里怎么说的(这里可以多看几遍体会一下):出于某些原因考虑我们需要运行两次predicate. 如果node上有更高或者相同优先级的“指定pods”(这里的“指定pods”指的是通过schedule计算后指定要跑在一个node上但是还未真正运行到那个node上的pods),我们将这些pods加入到meta和nodeInfo后执行一次计算过程。如果这个过程所有的predicates都成功了,我们再假设这些“指定pods”不会跑到node上再运行一次。第二次计算是必须的,因为有一些predicates比如pod亲和性,也许在“指定pods”没有成功跑到node的情况下会不满足。如果没有“指定pods”或者第一次计算过程失败了,那么第二次计算不会进行。我们在第一次调度的时候只考虑相等或者更高优先级的pods,因为这些pod是当前pod必须“臣服”的,也就是说不能够从这些pod中抢到资源,这些pod不会被当前pod“抢占”;这样当前pod也就能够安心从低优先级的pod手里抢资源了。新pod在上述2种情况下都可调度基于一个保守的假设:资源和pod反亲和性等的predicate在“指定pods”被处理为Running时更容易失败;pod亲和性在“指定pods”被处理为Not Running时更加容易失败。我们不能假设“指定pods”是Running的因为它们当前还没有运行,而且事实上,它们确实有可能最终又被调度到其他node上了。
- func podFitsOnNode(
- pod *v1.Pod,
- meta algorithm.PredicateMetadata,
- info *schedulercache.NodeInfo,
- predicateFuncs map[string]algorithm.FitPredicate,
- nodeCache *equivalence.NodeCache,
- queue internalqueue.SchedulingQueue,
- alwaysCheckAllPredicates bool,
- equivClass *equivalence.Class,
- ) (bool, []algorithm.PredicateFailureReason, error) {
- var (
- eCacheAvailable bool
- failedPredicates []algorithm.PredicateFailureReason
- )
-
- podsAdded := false
- // ljs:第一次循环,将所有的优先级比较高或者相等的nominatedPods加入到Node中,
- // 更新meta和nodeInfo。nominatedPods是指已经分配到Node内但是还没有真正运行起来的Pods。
- // 这样做可以保证优先级高的Pods不会因为现在的Pod的加入而导致调度失败;
- // ljs:第一次调度,根据NominatedPods更新meta和nodeInfo信息,pod根据更新后的信息去预选
- // ljs:第二次调度,meta和nodeInfo信息不变,保证pod不完全依赖于NominatedPods
- //(主要考虑到pod亲和性之类的,比如某个nominatedPod没有在这个节点上运行,predicate可能会失败)
- // ljs:不将nominatedPods加入到Node内。这样的原因是因为考虑到像Pod affinity策略的话,如果当前的Pod依赖的是nominatedPods,
- // 这样就会有问题。因为,nominatedPods不能保证一定可以调度到相应的Node上。
- for i := 0; i < 2; i++ {
- metaToUse := meta
- nodeInfoToUse := info
- if i == 0 {
- podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
- } else if !podsAdded || len(failedPredicates) != 0 {
- break
- }
- // Bypass eCache if node has any nominated pods.
- // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
- // when pods are nominated or their nominations change.
- eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
- for predicateID, predicateKey := range predicates.Ordering() {
- var (
- fit bool
- reasons []algorithm.PredicateFailureReason
- err error
- )
- //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
-
- // predicate相关函数在sc, err := configurator.CreateFromConfig(*policy)进行注册,
- // 具体代码在:pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPredicates 方法返回的是默认的一系列预选算法。
- // ljs:如果当前pod在之前有一个等价pod,则直接从缓存中返回相应的上一次结果(一个节点上有多个相同的pod要发布)
- if predicate, exist := predicateFuncs[predicateKey]; exist {
- if eCacheAvailable {
- fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
- } else { //ljs:直接调用预选算法
- fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
- }
- if err != nil {
- return false, []algorithm.PredicateFailureReason{}, err
- }
-
- if !fit {
- // eCache is available and valid, and predicates result is unfit, record the fail reasons
- failedPredicates = append(failedPredicates, reasons...)
- // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
- if !alwaysCheckAllPredicates {
- klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
- "evaluation is short circuited and there are chances " +
- "of other predicates failing as well.")
- break
- }
- }
- }
- }
- }
-
- return len(failedPredicates) == 0, failedPredicates, nil
- }
podFitsOnNode最难理解的就是for循环了两次,根据注释,大致意思如下:
1、第一次循环,将所有的优先级比较高或者相等的nominatedPods加入到Node中,更新meta和nodeInfo。nominatedPods是指已经分配到Node内但是还没有真正运行起来的Pods。这样做可以保证优先级高的Pods不会因为现在的Pod的加入而导致调度失败;
2、第二次调度,不将nominatedPods加入到Node内。这样的原因是因为考虑到像Pod affinity策略的话,如果当前的Pod依赖的是nominatedPods,这样就会有问题。因为,nominatedPods不能保证一定可以调度到相应的Node上。
之后就是根据预选的调度算法,一个个判断是否都满足。这里有个小优化,如果当前的Pod在之前有一个等价的Pod,则直接从缓存返回相应上一次的结果。如果成功则不用继续调用预选算法。但是,对于缓存部分,我个人有些疑问,可能对于上一个Pod缓存的结果是成功的,但是本次调度,Node信息发生变化了,缓存结果是成功的,但是实际上可能并不一定会成功。
默认预选调度算法
本节主要说的是默认的调度算法。默认的代码在pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPredicates方法返回的是默认的一系列预选算法。与预选相关的代码都在pkg/scheduler/algorithm/predicates/predicates.go下
- var (
- predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
- GeneralPred, HostNamePred, PodFitsHostPortsPred,
- MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
- PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
- CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
- MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
- CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
- )
对于每一个调度算法,有一个优先级Order,官网有详细的描述。
调度方法基本一致,参数为(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo),返回值为(bool, []algorithm.PredicateFailureReason, error)。
官网地址:
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/predicates-ordering.md
当然这个顺序是可以被配置文件覆盖的,用户可以使用类似这样的配置:
- {"kind" : "Policy","apiVersion" : "v1","predicates" : [
- {"name" : "PodFitsHostPorts", "order": 2},
- {"name" : "PodFitsResources", "order": 3},
- {"name" : "NoDiskConflict", "order": 5},
- {"name" : "PodToleratesNodeTaints", "order": 4},
- {"name" : "MatchNodeSelector", "order": 6},
- {"name" : "PodFitsHost", "order": 1}
- ],"priorities" : [
- {"name" : "LeastRequestedPriority", "weight" : 1},
- {"name" : "BalancedResourceAllocation", "weight" : 1},
- {"name" : "ServiceSpreadingPriority", "weight" : 1},
- {"name" : "EqualPriority", "weight" : 1}
- ],"hardPodAffinitySymmetricWeight" : 10}
具体的predicate函数
一直在讲predicate,那么predicate函数到底长什么样子呢,我们从具体的实现函数找一个看一下。开始讲design的时候提到过predicate的实现在pkg/scheduler/algorithm/predicates/predicates.go文件中,先看一眼Structure吧:
- func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
- for _, v := range pod.Spec.Volumes {
- for _, ev := range nodeInfo.Pods() {
- if isVolumeConflict(v, ev) {
- return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil
- }
- }
- }
- return true, nil, nil}
我们知道predicate函数的特点,这样就很好在这个一千六百多行go文件中寻找predicate函数了。像上面这个NoDiskConflict()函数,参数是pod、meta和nodeinfo,很明显是FitPredicate类型的,标准的predicate函数。这个函数的实现也特别简单,遍历pod的Volumes,然后对于pod的每一个Volume,遍历node上的每个pod,看是否和当前podVolume冲突。如果不fit就返回false加原因;如果fit就返回true,很清晰。
func (g *genericScheduler) Schedule()函数在
预选完成之后会得到一个Node的数组。如果预选合适的节点数大于1,则需要调用优选算法根据评分获取最优的节点。
优选算法调用的接口是PrioritizeNodes。
优选算法调用的接口是PrioritizeNodes,使用与预选类似的多任务同步调用方式,采用MapReduce的思想,Map根据不同的优选算法获取对某一Node的值,根据Reduce统计最终的结果。
- func PrioritizeNodes(
- pod *v1.Pod,
- nodeNameToInfo map[string]*schedulercache.NodeInfo,
- meta interface{},
- priorityConfigs []algorithm.PriorityConfig,
- nodes []*v1.Node,
- extenders []algorithm.SchedulerExtender,
- ) (schedulerapi.HostPriorityList, error) {
- // If no priority configs are provided, then the EqualPriority function is applied
- // This is required to generate the priority list in the required format
- // ljs:没有优选配置,默认每个节点等权重
- if len(priorityConfigs) == 0 && len(extenders) == 0 {
- result := make(schedulerapi.HostPriorityList, 0, len(nodes))
- for i := range nodes {
- hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
- if err != nil {
- return nil, err
- }
- result = append(result, hostPriority)
- }
- return result, nil
- }
-
- var (
- mu = sync.Mutex{}
- wg = sync.WaitGroup{}
- errs []error
- )
- appendError := func(err error) {
- mu.Lock()
- defer mu.Unlock()
- errs = append(errs, err)
- }
- // 最后一个变量results也不难理解,类型是[]schedulerapi.HostPriorityList,这里需要注意这个类型
- // 的作用,它保存的是所有算法作用所有node之后得到的结果集,相当于一个二维数组,每个格子是1个算法
- // 作用于1个节点的结果,一行也就是1个算法作用于所有节点的结果;一行展成一个二维就是所有算法作用于所有节点;
- // 假设有3中优先级配置:result:=[[0]:[{1,1},{2,1},{3,1},...], [1]:[{1,1},{2,1},{3,1},...], [3]:[{1,1},{2,1},{3,1},...]]
- results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
-
- // DEPRECATED: we can remove this when all priorityConfigs implement the
- // Map-Reduce pattern.
- for i := range priorityConfigs {
- if priorityConfigs[i].Function != nil {
- wg.Add(1)
- go func(index int) {
- defer wg.Done()
- var err error
- // ljs:求出每个节点在配置index上的得分
- results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
- if err != nil {
- appendError(err)
- }
- }(i)
- } else {
- results[i] = make(schedulerapi.HostPriorityList, len(nodes))
- }
- }
-
- // 这里的index是node的序号,和上面的index不同,上面的index是指priorityConfigs的序号
- workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
- nodeInfo := nodeNameToInfo[nodes[index].Name]
- for i := range priorityConfigs {
- // 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;
- if priorityConfigs[i].Function != nil {
- // 因为前面old已经运行过了,也就是priorityConfigs[i].Function
- // 这里是两种计算result的方法,选择其中一种就行,Function是old,map是新方法
- continue
- }
-
- var err error
- results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
- if err != nil {
- appendError(err)
- results[i][index].Host = nodes[index].Name
- }
- }
- })
-
- for i := range priorityConfigs {
- if priorityConfigs[i].Reduce == nil {
- continue
- }
- wg.Add(1)
- go func(index int) {
- defer wg.Done()
- if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
- appendError(err)
- }
- if klog.V(10) {
- for _, hostPriority := range results[index] {
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
- }
- }
- }(i)
- }
- // Wait for all computations to be finished.
- wg.Wait()
- if len(errs) != 0 {
- return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
- }
-
- // Summarize all scores.
- //result用于存储每个node的Score,注意区别result和results
- result := make(schedulerapi.HostPriorityList, 0, len(nodes))
-
- for i := range nodes {
- // 初始化节点i的得分score为0
- result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
- for j := range priorityConfigs {
- // 遍历所有优先级算法,每个算法有一个加权得分,累加就可以得到节点i的最终得分
- result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
- }
- }
-
- if len(extenders) != 0 && nodes != nil {
- combinedScores := make(map[string]int, len(nodeNameToInfo))
- for i := range extenders {
- if !extenders[i].IsInterested(pod) {
- continue
- }
- wg.Add(1)
- go func(extIndex int) {
- defer wg.Done()
- prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
- if err != nil {
- // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
- return
- }
- mu.Lock()
- for i := range *prioritizedList {
- host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
- if klog.V(10) {
- klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
- }
- combinedScores[host] += score * weight
- }
- mu.Unlock()
- }(i)
- }
- // wait for all go routines to finish
- wg.Wait()
- for i := range result {
- result[i].Score += combinedScores[result[i].Host]
- }
- }
-
- if klog.V(10) {
- for i := range result {
- klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
- }
- }
- return result, nil
- }
这段代码有两段代码感觉是重复了,代码如下:
- 代码1
- // DEPRECATED: we can remove this when all priorityConfigs implement the
- // Map-Reduce pattern.
- for i := range priorityConfigs {
- if priorityConfigs[i].Function != nil {
- wg.Add(1)
- go func(index int) {
- 。。。
- // ljs:求出每个节点在配置index上的得分
- results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
- 。。。
- }(i)
- } else {
- results[i] = make(schedulerapi.HostPriorityList, len(nodes))
- }
- }
-
- // 代码2
- workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
- nodeInfo := nodeNameToInfo[nodes[index].Name]
- for i := range priorityConfigs {
- // 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;
- if priorityConfigs[i].Function != nil {
- // 因为前面old已经运行过了,也就是priorityConfigs[i].Function
- // 这里是两种计算result的方法,选择其中一种就行,Function是old,map是新方法
- continue
- }
-
- var err error
- results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
- if err != nil {
- appendError(err)
- results[i][index].Host = nodes[index].Name
- }
- }
- })
-
-
- // 分割—————————— type PriorityConfig struct {
- Name string
- Map PriorityMapFunction
- Reduce PriorityReduceFunction
- // TODO: Remove it after migrating all functions to
- // Map-Reduce pattern.
- Function PriorityFunction
- Weight int
- }
上述两段代码,其实做的是同一件事,就是遍历每个优先级算法函数func,计算每个节点在这个func上的得分。只是使用的方式不一样,我们也可以从PriorityConfig的定义可以看出来,PriorityConfig.Map会取代PriorityFunction。
优选调度算法默认代码在pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPriorities方法返回的是默认的一系列优选算法,通过工厂模式处理相应的优选算法,代码如下:
- func defaultPriorities() sets.String {
- return sets.NewString(
- // spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
- factory.RegisterPriorityConfigFactory(
- "SelectorSpreadPriority",
- factory.PriorityConfigFactory{
- MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
- return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
- },
- Weight: 1,
- },
- ),
- // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
- // as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
- factory.RegisterPriorityConfigFactory(
- "InterPodAffinityPriority",
- factory.PriorityConfigFactory{
- Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
- return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
- },
- Weight: 1,
- },
- ),
-
- // Prioritize nodes by least requested utilization.
- factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),
-
- // Prioritizes nodes to help achieve balanced resource usage
- factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),
-
- // Set this weight large enough to override all other priority functions.
- // TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
- factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
-
- // Prioritizes nodes that have labels matching NodeAffinity
- factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),
-
- // Prioritizes nodes that marked with taint which pod can tolerate.
- factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
-
- // ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
- factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
- )
- }
-
Function和Map-Reduce实例分析
InterPodAffinityPriority(Function)
这个算法做的是Pod间亲和性优选,也就是亲和pod越多的节点分值越高,反亲和pod越多分值越低。我们撇开具体的亲和性计算规则,从优选函数的形式上看一下这段代码的逻辑:
- // 代码位置: pkg/scheduler/algorithm/priorities/interpod_affinity.go:119func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
- affinity := pod.Spec.Affinity
- // 是否有亲和性约束;
- hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
- // 是否有反亲和性约束;
- hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
-
- // 这里有一段根据亲和性和反亲和性来计算一个node上匹配的pod数量的逻辑,我们先跳过这些逻辑,从优选算法实现的角度看这个算法的架子;
-
- // 当遍历完所有的node之后,可以得到1个最高分和1个最低分,分别记为maxCount和minCount;
- for _, node := range nodes {
- if pm.counts[node.Name] > maxCount {
- maxCount = pm.counts[node.Name]
- }
- if pm.counts[node.Name] < minCount {
- minCount = pm.counts[node.Name]
- }
- }
- // 这个result类型和前面看到的一样,都是存储单个算法的计算结果的;
- result := make(schedulerapi.HostPriorityList, 0, len(nodes))
- for _, node := range nodes {
- fScore := float64(0)
- // 如果分差大于0,也就是说不是所有的node都一样的情况,需要对分值做一个处理;
- if (maxCount - minCount) > 0 {
- // MaxPriority定义的是优选最高分10,第二个因数是当前node的count-最小count,
- // 然后除以(maxCount - minCount);举个例子,当前node的计算结果是5,最大count是20,最小
- // count是-3,那么这里就是10*[5-(-3)/20-(-3)]
- // 这个计算的结果显然会在[0-10]之间;
- fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount))
- }
- // 如果分差不大于0,这时候int(fScore)也就是0,对于各个node的结果都是0;
- result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
- }
- return result, nil}
如上,我们可以发现最终这个函数计算出了每个node的分值,这个分值在[0-10]之间。所以说到底Function做的事情就是根据一定的规则给每个node赋一个分值,这个分值要求在[0-10]之间,然后把这个HostPriorityList返回就行。
CalculateNodeAffinityPriorityMap(Map)
这个算法和上一个类似,上一个是Pod的Affinity,这个是Node的Affinity;我们来看代码:
- // 代码位置:pkg/scheduler/algorithm/priorities/node_affinity.go:34func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
- node := nodeInfo.Node()
- if node == nil {
- return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
- }
-
- // default is the podspec.
- affinity := pod.Spec.Affinity
- if priorityMeta, ok := meta.(*priorityMetadata); ok {
- // We were able to parse metadata, use affinity from there.
- affinity = priorityMeta.affinity
- }
-
- var count int32
- if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
- // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
- for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
- preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
- if preferredSchedulingTerm.Weight == 0 {
- continue
- }
-
- nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
- if err != nil {
- return schedulerapi.HostPriority{}, err
- }
- if nodeSelector.Matches(labels.Set(node.Labels)) {
- count += preferredSchedulingTerm.Weight
- }
- }
- }
-
- return schedulerapi.HostPriority{
- Host: node.Name,
- Score: int(count),
- }, nil}
撇开具体的亲和性计算细节,我们可以发现这个的count没有特定的规则,可能会加到10以上;另外这里的返回值是HostPriority类型,前面的Function返回了HostPriorityList类型。
map函数
- pkg/scheduler/algorithm/priorities/selector_spreading.go:221func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
- var firstServiceSelector labels.Selector
-
- node := nodeInfo.Node()
- if node == nil {
- return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
- }
- priorityMeta, ok := meta.(*priorityMetadata)
- if ok {
- firstServiceSelector = priorityMeta.podFirstServiceSelector
- } else {
- firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
- }
- // 查找给定node在给定namespace下符合selector的pod,返回值是[]*v1.Pod
- matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)
-
- return schedulerapi.HostPriority{
- Host: node.Name,
- // 返回值中Score设置成上面找到的pod的数量
- Score: int(len(matchedPodsOfNode)),
- }, nil}
这个函数比较短,可以看到在指定node上查询到匹配selector的pod越多,分值就越高。假设找到了20个,那么这里的分值就是20;假设找到的是2,那这里的分值就是2.
CalculateNodeAffinityPriorityReduce(Reduce)
和上面这个Map对应的Reduce函数其实没有单独实现,通过NormalizeReduce函数做了一个通用的Reduce处理:
-
- pkg/scheduler/algorithm/priorities/node_affinity.go:77var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)pkg/scheduler/algorithm/priorities/reduce.go:29func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {
- return func(
- _ *v1.Pod,
- _ interface{},
- _ map[string]*schedulercache.NodeInfo,
- // 注意到这个result是HostPriorityList,对应1个算法N个node的结果集
- result schedulerapi.HostPriorityList) error {
-
- var maxCount int
- // 遍历result将最高的Score赋值给maxCount;
- for i := range result {
- if result[i].Score > maxCount {
- maxCount = result[i].Score
- }
- }
-
- if maxCount == 0 {
- if reverse {
- for i := range result {
- result[i].Score = maxPriority
- }
- }
- return nil
- }
-
- for i := range result {
- score := result[i].Score
- // 举个例子:10*(5/20)
- score = maxPriority * score / maxCount
- if reverse {
- // 如果score是3,得到7;如果score是4,得到6,结果反转;
- score = maxPriority - score
- }
-
- result[i].Score = score
- }
- return nil
- }}
map-reduce小节
sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt
详见k8s源码分析--kube-scheduler源码(二)
https://juejin.im/post/5c889c2e5188257df700a732
https://www.kubernetes.org.cn/5122.html
https://www.cnblogs.com/cloudgeek/p/10561221.html
https://www.kubernetes.org.cn/5221.html
http://tang.love/2018/07/24/learning-kubernetes-source-code/
https://my.oschina.net/u/3797264/blog/2615842
https://my.oschina.net/jxcdwangtao/blog/1594348
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。