当前位置:   article > 正文

k8s源码分析--kube-scheduler源码(一)_kubernetes scheduler 源码

kubernetes scheduler 源码

版本:v1.13.0

启动分析

kubernetes基础组件的入口均在cmd目录下,kube-schduler入口在scheduler.go下。
kubernetes所有的组件启动采用的均是command的形式,引用的是spf13类库。

  1. func main() {
  2. rand.Seed(time.Now().UnixNano())
  3. //创建Cobra格式的Scheduler command
  4. command := app.NewSchedulerCommand()
  5. // TODO: once we switch everything over to Cobra commands, we can go back to calling
  6. // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
  7. // normalize func and add the go flag set by hand.
  8. //将配置中的‘_’字符转化为‘-’字符
  9. pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
  10. // utilflag.InitFlags()
  11. logs.InitLogs()
  12. defer logs.FlushLogs()
  13. //执行Scheduler command
  14. if err := command.Execute(); err != nil {
  15. fmt.Fprintf(os.Stderr, "%v\n", err)
  16. os.Exit(1)
  17. }
  18. }

通过将配置文件转化成command的形式,调用Execute方法执行定义的Run方法

  1. Run: func(cmd *cobra.Command, args []string) {
  2. if err := runCommand(cmd, args, opts); err != nil {
  3. fmt.Fprintf(os.Stderr, "%v\n", err)
  4. os.Exit(1)
  5. }
  6. },

进入runCommand方法,通过完成配置的初始化,调用Run方法,进一步启动。

  1. // runCommand runs the scheduler.func runCommand(cmd *cobra.Command, args []string, opts *options.Options) error {
  2. 。。。
  3. // Get the completed config
  4. cc := c.Complete()
  5. // To help debugging, immediately log version
  6. klog.Infof("Version: %+v", version.Get())
  7. // Apply algorithms based on feature gates.
  8. // TODO: make configurable?
  9. algorithmprovider.ApplyFeatureGates()
  10. // Configz registration.
  11. if cz, err := configz.New("componentconfig"); err == nil {
  12. cz.Set(cc.ComponentConfig)
  13. } else {
  14. return fmt.Errorf("unable to register configz: %s", err)
  15. }
  16. return Run(cc, stopCh)
  17. }

Run方法分析

Run方法主要做了以下工作:
1、判断是否需要添加VolumeScheduling新特性;
2、初始化调度参数的相关结构体;
3、配置准备事件广播;
4、健康检查相关配置;
5、Metrics相关配置;
6、启动所有的Informer(kubernetes主要就是通过Informer和Workqueue机制监听事件的变化);
7、判断是否需要LeaderElection,决定最终的启动

调度入口

Run(cc, stopCh)->sched.Run()->sched.scheduleOne

scheduleOne方法分析

cheduleOne,顾名思义,每次调度一个Pod,整体文件如

  1. func (sched *Scheduler) scheduleOne() {
  2. pod := sched.config.NextPod()
  3. // pod could be nil when schedulerQueue is closed
  4. if pod == nil {
  5. return
  6. }
  7. if pod.DeletionTimestamp != nil {
  8. sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
  9. klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
  10. return
  11. }
  12. klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
  13. // Synchronously attempt to find a fit for the pod.
  14. start := time.Now()
  15. // ljs:调度算法:最终的调度在generic_scheduler.go的Schedule方法
  16. // ljs:schedule()可能已经失败,因为pod不适合任何主机,
  17. // 因此我们尝试抢占,期望下次尝试pod进行调度时,由于抢占,它将适合。也可以安排不同的pod进入被抢占的资源,但这是无害的。
  18. suggestedHost, err := sched.schedule(pod)
  19. if err != nil {
  20. // ljs:当通过正常的调度流程如果没有找到合适的节点(主要是预选没有合适的节点),
  21. // 会判断需不需要进行抢占调度,具体的代码在pkg/scheduler/scheduler.go文件下,用到的方法preempt
  22. if fitError, ok := err.(*core.FitError); ok {
  23. preemptionStartTime := time.Now()
  24. sched.preempt(pod, fitError)
  25. metrics.PreemptionAttempts.Inc()
  26. metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
  27. metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
  28. metrics.PodScheduleFailures.Inc()
  29. } else {
  30. klog.Errorf("error selecting node for pod: %v", err)
  31. metrics.PodScheduleErrors.Inc()
  32. }
  33. return
  34. }
  35. metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
  36. // 3.Pod与Node缓存,保证调度一直进行,不用等待每次绑定完成(绑定是一个耗时的过程)
  37. assumedPod := pod.DeepCopy()
  38. // ljs: 一个Pod被计划调度到机器A的事实被称为assume调度,即假定调度,
  39. // 这些调度安排被保存在特定的队列里,此时调度过程是能看到这个预安排的,因而影响到其他Pod的调度。
  40. allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
  41. if err != nil {
  42. klog.Errorf("error assuming volumes: %v", err)
  43. metrics.PodScheduleErrors.Inc()
  44. return
  45. }
  46. // assume modifies `assumedPod` by setting NodeName=suggestedHost //ljs: 5. Pod对应的NodeName写上主机名,存入缓存
  47. err = sched.assume(assumedPod, suggestedHost)
  48. if err != nil {
  49. klog.Errorf("error assuming pod: %v", err)
  50. metrics.PodScheduleErrors.Inc()
  51. return
  52. }
  53. // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
  54. //ljs: 6. 请求apiserver,异步处理最终的绑定,写入到etcd
  55. go func() {
  56. // Bind volumes first before Pod
  57. if !allBound {
  58. err := sched.bindVolumes(assumedPod)
  59. if err != nil {
  60. klog.Errorf("error binding volumes: %v", err)
  61. metrics.PodScheduleErrors.Inc()
  62. return
  63. }
  64. }
  65. err := sched.bind(assumedPod, &v1.Binding{
  66. ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
  67. Target: v1.ObjectReference{
  68. Kind: "Node",
  69. Name: suggestedHost,
  70. },
  71. })
  72. metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
  73. if err != nil {
  74. klog.Errorf("error binding pod: %v", err)
  75. metrics.PodScheduleErrors.Inc()
  76. } else {
  77. metrics.PodScheduleSuccesses.Inc()
  78. }
  79. }()
  80. }

主要做了以下工作:
1、从队列中取出待调度的Pod
2、根据调度算法(预选+优选)获取待调度Pod匹配的主机,如果未获取到合适的主机,判断是否需要preempt,即Pod的抢占策略,为Pod分配节点
3、将当前Pod缓存起来,假定已经绑定成功(主要是为了将scheduling与binding过程分开)
4、判断是否需要VolumeScheduling特性继续添加Pod信息
5、Pod对应的NodeName写上主机名(调度的本质就是将为空的NodeName写上相应的Node的值)
6、启动新的binding协程,请求apiserver,异步处理最终的绑定,将结果写入到etcd中

调度算法

sched.scheduleOne -> sched.schedule(pod) -> generic_scheduler.go

  1. func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
  2. trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
  3. defer trace.LogIfLong(100 * time.Millisecond)
  4. // 对pod做一些基础检查,及检查pod对应的pvc
  5. if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
  6. return "", err
  7. }
  8. //取得node list列表
  9. nodes, err := nodeLister.List()
  10. if err != nil {
  11. return "", err
  12. }
  13. ...
  14. trace.Step("Computing predicates")
  15. startPredicateEvalTime := time.Now()
  16. //ljs:调度算法预选
  17. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
  18. if err != nil {
  19. return "", err
  20. }
  21. ...
  22. metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
  23. //优选算法调用的接口,执行PrioritizeNodes方法对通过预选的node进行优选打分
  24. priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders) //ljs:优选算法
  25. if err != nil {
  26. return "", err
  27. }
  28. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
  29. metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
  30. trace.Step("Selecting host")
  31. //最后找出一个优选分数最高的node,如果有node优选分数一样,则随机返回一个分数最高的node
  32. return g.selectHost(priorityList)
  33. }

如果未自定义调度器,则启用默认的调度器genericScheduler,genericScheduler的Schedule方法如下:

  • 对pod做一些基础检查,及检查pod对应的pvc
  • 取得node list列表
  • 执行genericScheduler.findNodesThatFit方法进行预选
  • 执行PrioritizeNodes方法对通过预选的node进行优选打分
  • 最后找出一个优选分数最高的node,如果有node优选分数一样,则随机返回一个分数最高的node

预选

预选算法调用的接口是findNodesThatFit,主要代码如下:

  1. func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
  2. var filtered []*v1.Node
  3. failedPredicateMap := FailedPredicateMap{}
  4. //if表示,如果没有配置预选的算法,则直接将所有的Node写入匹配数组
  5. if len(g.predicates) == 0 {
  6. filtered = nodes
  7. } else {
  8. allNodes := int32(g.cache.NodeTree().NumNodes)
  9. // numFeasibleNodesToFind保证一次性不用返回过多的Node数量,避免数组过大
  10. numNodesToFind := g.numFeasibleNodesToFind(allNodes)
  11. // Create filtered list with enough space to avoid growing it
  12. // and allow assigning.
  13. filtered = make([]*v1.Node, numNodesToFind)
  14. errs := errors.MessageCountMap{}
  15. var (
  16. predicateResultLock sync.Mutex
  17. filteredLen int32
  18. equivClass *equivalence.Class
  19. )
  20. ctx, cancel := context.WithCancel(context.Background())
  21. // We can use the same metadata producer for all nodes.
  22. meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)
  23. if g.equivalenceCache != nil {
  24. // getEquivalenceClassInfo will return immediately if no equivalence pod found
  25. equivClass = equivalence.NewClass(pod)
  26. }
  27. // checkNode处理预选策略
  28. checkNode := func(i int) {
  29. var nodeCache *equivalence.NodeCache
  30. // 每次获取Node信息
  31. nodeName := g.cache.NodeTree().Next()
  32. if g.equivalenceCache != nil {
  33. nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
  34. }
  35. // 最终实现调度判断的接口
  36. fits, failedPredicates, err := podFitsOnNode(
  37. pod,
  38. meta,
  39. g.cachedNodeInfoMap[nodeName],
  40. g.predicates,
  41. nodeCache,
  42. g.schedulingQueue,
  43. g.alwaysCheckAllPredicates,
  44. equivClass,
  45. )
  46. if err != nil {
  47. predicateResultLock.Lock()
  48. errs[err.Error()]++
  49. predicateResultLock.Unlock()
  50. return
  51. }
  52. if fits {
  53. // 保证获取的Node数量在numNodesToFind内
  54. length := atomic.AddInt32(&filteredLen, 1)
  55. if length > numNodesToFind {
  56. // 通知ParallelizeUntil任务结束
  57. cancel()
  58. atomic.AddInt32(&filteredLen, -1)
  59. } else {
  60. filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
  61. }
  62. } else {
  63. predicateResultLock.Lock()
  64. failedPredicateMap[nodeName] = failedPredicates
  65. predicateResultLock.Unlock()
  66. }
  67. }
  68. // Stops searching for more nodes once the configured number of feasible nodes
  69. // are found.
  70. // 并行处理多个Node的checkNode工作
  71. workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
  72. filtered = filtered[:filteredLen]
  73. if len(errs) > 0 {
  74. return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
  75. }
  76. }
  77. //ljs:如果配置了调度的扩展算法,需要继续对筛选后的Pod与Node进行再一次的筛选,获取最终匹配的Node列表。
  78. if len(filtered) > 0 && len(g.extenders) != 0 {
  79. for _, extender := range g.extenders {
  80. if !extender.IsInterested(pod) {
  81. continue
  82. }
  83. filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
  84. if err != nil {
  85. if extender.IsIgnorable() {
  86. klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
  87. extender, err)
  88. continue
  89. } else {
  90. return []*v1.Node{}, FailedPredicateMap{}, err
  91. }
  92. }
  93. for failedNodeName, failedMsg := range failedMap {
  94. if _, found := failedPredicateMap[failedNodeName]; !found {
  95. failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
  96. }
  97. failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
  98. }
  99. filtered = filteredList
  100. if len(filtered) == 0 {
  101. break
  102. }
  103. }
  104. }
  105. return filtered, failedPredicateMap, nil
  106. }

findNodesThatFit主要做了几个操作
1、判断是否配置了预选算法,如果没有,直接返回Node列表信息;
2、如果配置了预选算法,则同时对多个Node(最多一次16个)调用checkNode方法,判断Pod是否可以调度在该Node上;
3、预选筛选之后,如果配置了调度的扩展算法,需要继续对筛选后的Pod与Node进行再一次的筛选,获取最终匹配的Node列表。

这里有一个注意的地方,获取匹配的Node节点数量时,通过numFeasibleNodesToFind函数限制了每次获取的节点数,最大值为100。这样当匹配到相应的Node数时,checkNode方法不再调用。
这里个人觉着有些问题,当Node数量足够多的时候(大于100),由于numFeasibleNodesToFind限制了Node数量,导致并不能扫描到所有的Node,这样可能导致最合适的Node没有被扫描到,匹配到的只是较优先的Node,则最终调度到的Node也不是最合适的Node,只是相较于比较合适。

预选调度实际接口:podFitsOnNode

最终实现调度判断的接口是podFitsOnNode。

这里的逻辑是从一个for循环开始的,关于这个2次循环的含义代码里有很长的一段注释,我们先看一下注释里怎么说的(这里可以多看几遍体会一下):
出于某些原因考虑我们需要运行两次predicate. 如果node上有更高或者相同优先级的“指定pods”(这里的“指定pods”指的是通过schedule计算后指定要跑在一个node上但是还未真正运行到那个node上的pods),我们将这些pods加入到meta和nodeInfo后执行一次计算过程。如果这个过程所有的predicates都成功了,我们再假设这些“指定pods”不会跑到node上再运行一次。第二次计算是必须的,因为有一些predicates比如pod亲和性,也许在“指定pods”没有成功跑到node的情况下会不满足。如果没有“指定pods”或者第一次计算过程失败了,那么第二次计算不会进行。我们在第一次调度的时候只考虑相等或者更高优先级的pods,因为这些pod是当前pod必须“臣服”的,也就是说不能够从这些pod中抢到资源,这些pod不会被当前pod“抢占”;这样当前pod也就能够安心从低优先级的pod手里抢资源了。新pod在上述2种情况下都可调度基于一个保守的假设:资源和pod反亲和性等的predicate在“指定pods”被处理为Running时更容易失败;pod亲和性在“指定pods”被处理为Not Running时更加容易失败。我们不能假设“指定pods”是Running的因为它们当前还没有运行,而且事实上,它们确实有可能最终又被调度到其他node上了。

  1. func podFitsOnNode(
  2. pod *v1.Pod,
  3. meta algorithm.PredicateMetadata,
  4. info *schedulercache.NodeInfo,
  5. predicateFuncs map[string]algorithm.FitPredicate,
  6. nodeCache *equivalence.NodeCache,
  7. queue internalqueue.SchedulingQueue,
  8. alwaysCheckAllPredicates bool,
  9. equivClass *equivalence.Class,
  10. ) (bool, []algorithm.PredicateFailureReason, error) {
  11. var (
  12. eCacheAvailable bool
  13. failedPredicates []algorithm.PredicateFailureReason
  14. )
  15. podsAdded := false
  16. // ljs:第一次循环,将所有的优先级比较高或者相等的nominatedPods加入到Node中,
  17. // 更新meta和nodeInfo。nominatedPods是指已经分配到Node内但是还没有真正运行起来的Pods。
  18. // 这样做可以保证优先级高的Pods不会因为现在的Pod的加入而导致调度失败;
  19. // ljs:第一次调度,根据NominatedPods更新meta和nodeInfo信息,pod根据更新后的信息去预选
  20. // ljs:第二次调度,meta和nodeInfo信息不变,保证pod不完全依赖于NominatedPods
  21. //(主要考虑到pod亲和性之类的,比如某个nominatedPod没有在这个节点上运行,predicate可能会失败)
  22. // ljs:不将nominatedPods加入到Node内。这样的原因是因为考虑到像Pod affinity策略的话,如果当前的Pod依赖的是nominatedPods,
  23. // 这样就会有问题。因为,nominatedPods不能保证一定可以调度到相应的Node上。
  24. for i := 0; i < 2; i++ {
  25. metaToUse := meta
  26. nodeInfoToUse := info
  27. if i == 0 {
  28. podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
  29. } else if !podsAdded || len(failedPredicates) != 0 {
  30. break
  31. }
  32. // Bypass eCache if node has any nominated pods.
  33. // TODO(bsalamat): consider using eCache and adding proper eCache invalidations
  34. // when pods are nominated or their nominations change.
  35. eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
  36. for predicateID, predicateKey := range predicates.Ordering() {
  37. var (
  38. fit bool
  39. reasons []algorithm.PredicateFailureReason
  40. err error
  41. )
  42. //TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
  43. // predicate相关函数在sc, err := configurator.CreateFromConfig(*policy)进行注册,
  44. // 具体代码在:pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPredicates 方法返回的是默认的一系列预选算法。
  45. // ljs:如果当前pod在之前有一个等价pod,则直接从缓存中返回相应的上一次结果(一个节点上有多个相同的pod要发布)
  46. if predicate, exist := predicateFuncs[predicateKey]; exist {
  47. if eCacheAvailable {
  48. fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
  49. } else { //ljs:直接调用预选算法
  50. fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
  51. }
  52. if err != nil {
  53. return false, []algorithm.PredicateFailureReason{}, err
  54. }
  55. if !fit {
  56. // eCache is available and valid, and predicates result is unfit, record the fail reasons
  57. failedPredicates = append(failedPredicates, reasons...)
  58. // if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
  59. if !alwaysCheckAllPredicates {
  60. klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
  61. "evaluation is short circuited and there are chances " +
  62. "of other predicates failing as well.")
  63. break
  64. }
  65. }
  66. }
  67. }
  68. }
  69. return len(failedPredicates) == 0, failedPredicates, nil
  70. }

podFitsOnNode最难理解的就是for循环了两次,根据注释,大致意思如下:
1、第一次循环,将所有的优先级比较高或者相等的nominatedPods加入到Node中,更新meta和nodeInfo。nominatedPods是指已经分配到Node内但是还没有真正运行起来的Pods。这样做可以保证优先级高的Pods不会因为现在的Pod的加入而导致调度失败;
2、第二次调度,不将nominatedPods加入到Node内。这样的原因是因为考虑到像Pod affinity策略的话,如果当前的Pod依赖的是nominatedPods,这样就会有问题。因为,nominatedPods不能保证一定可以调度到相应的Node上。

之后就是根据预选的调度算法,一个个判断是否都满足。这里有个小优化,如果当前的Pod在之前有一个等价的Pod,则直接从缓存返回相应上一次的结果。如果成功则不用继续调用预选算法。但是,对于缓存部分,我个人有些疑问,可能对于上一个Pod缓存的结果是成功的,但是本次调度,Node信息发生变化了,缓存结果是成功的,但是实际上可能并不一定会成功。

默认预选调度算法

本节主要说的是默认的调度算法。默认的代码在pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPredicates方法返回的是默认的一系列预选算法。与预选相关的代码都在pkg/scheduler/algorithm/predicates/predicates.go下

  1. var (
  2. predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
  3. GeneralPred, HostNamePred, PodFitsHostPortsPred,
  4. MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
  5. PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
  6. CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
  7. MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
  8. CheckNodeMemoryPressurePred, CheckNodePIDPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
  9. )

对于每一个调度算法,有一个优先级Order,官网有详细的描述。
调度方法基本一致,参数为(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo),返回值为(bool, []algorithm.PredicateFailureReason, error)。
官网地址:
https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/predicates-ordering.md

当然这个顺序是可以被配置文件覆盖的,用户可以使用类似这样的配置:

  1. {"kind" : "Policy","apiVersion" : "v1","predicates" : [
  2. {"name" : "PodFitsHostPorts", "order": 2},
  3. {"name" : "PodFitsResources", "order": 3},
  4. {"name" : "NoDiskConflict", "order": 5},
  5. {"name" : "PodToleratesNodeTaints", "order": 4},
  6. {"name" : "MatchNodeSelector", "order": 6},
  7. {"name" : "PodFitsHost", "order": 1}
  8. ],"priorities" : [
  9. {"name" : "LeastRequestedPriority", "weight" : 1},
  10. {"name" : "BalancedResourceAllocation", "weight" : 1},
  11. {"name" : "ServiceSpreadingPriority", "weight" : 1},
  12. {"name" : "EqualPriority", "weight" : 1}
  13. ],"hardPodAffinitySymmetricWeight" : 10}

具体的predicate函数

一直在讲predicate,那么predicate函数到底长什么样子呢,我们从具体的实现函数找一个看一下。开始讲design的时候提到过predicate的实现在pkg/scheduler/algorithm/predicates/predicates.go文件中,先看一眼Structure吧:

  1. func NoDiskConflict(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
  2. for _, v := range pod.Spec.Volumes {
  3. for _, ev := range nodeInfo.Pods() {
  4. if isVolumeConflict(v, ev) {
  5. return false, []algorithm.PredicateFailureReason{ErrDiskConflict}, nil
  6. }
  7. }
  8. }
  9. return true, nil, nil}

我们知道predicate函数的特点,这样就很好在这个一千六百多行go文件中寻找predicate函数了。像上面这个NoDiskConflict()函数,参数是pod、meta和nodeinfo,很明显是FitPredicate类型的,标准的predicate函数。这个函数的实现也特别简单,遍历pod的Volumes,然后对于pod的每一个Volume,遍历node上的每个pod,看是否和当前podVolume冲突。如果不fit就返回false加原因;如果fit就返回true,很清晰。

优选

func (g *genericScheduler) Schedule()函数在
预选完成之后会得到一个Node的数组。如果预选合适的节点数大于1,则需要调用优选算法根据评分获取最优的节点。
优选算法调用的接口是PrioritizeNodes。

优选调度算法

优选算法调用的接口是PrioritizeNodes,使用与预选类似的多任务同步调用方式,采用MapReduce的思想,Map根据不同的优选算法获取对某一Node的值,根据Reduce统计最终的结果。

  • PrioritizeNodes要做的事情是给已经通过predicate的nodes赋上一个分值,从而抉出一个最优node用于运行当前pod.
  • PrioritizeNodes通过并发调用一个个priority函数来给node排优先级。每一个priority函数会给一个1-10之间的分值,0最低10最高。每一个priority函数可以有自己的权重,单个函数返回的分值*权重后得到一个加权分值,最终所有的加权分值加在一起就是这个node的最终分值。
  1. func PrioritizeNodes(
  2. pod *v1.Pod,
  3. nodeNameToInfo map[string]*schedulercache.NodeInfo,
  4. meta interface{},
  5. priorityConfigs []algorithm.PriorityConfig,
  6. nodes []*v1.Node,
  7. extenders []algorithm.SchedulerExtender,
  8. ) (schedulerapi.HostPriorityList, error) {
  9. // If no priority configs are provided, then the EqualPriority function is applied
  10. // This is required to generate the priority list in the required format
  11. // ljs:没有优选配置,默认每个节点等权重
  12. if len(priorityConfigs) == 0 && len(extenders) == 0 {
  13. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
  14. for i := range nodes {
  15. hostPriority, err := EqualPriorityMap(pod, meta, nodeNameToInfo[nodes[i].Name])
  16. if err != nil {
  17. return nil, err
  18. }
  19. result = append(result, hostPriority)
  20. }
  21. return result, nil
  22. }
  23. var (
  24. mu = sync.Mutex{}
  25. wg = sync.WaitGroup{}
  26. errs []error
  27. )
  28. appendError := func(err error) {
  29. mu.Lock()
  30. defer mu.Unlock()
  31. errs = append(errs, err)
  32. }
  33. // 最后一个变量results也不难理解,类型是[]schedulerapi.HostPriorityList,这里需要注意这个类型
  34. // 的作用,它保存的是所有算法作用所有node之后得到的结果集,相当于一个二维数组,每个格子是1个算法
  35. // 作用于1个节点的结果,一行也就是1个算法作用于所有节点的结果;一行展成一个二维就是所有算法作用于所有节点;
  36. // 假设有3中优先级配置:result:=[[0]:[{1,1},{2,1},{3,1},...], [1]:[{1,1},{2,1},{3,1},...], [3]:[{1,1},{2,1},{3,1},...]]
  37. results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
  38. // DEPRECATED: we can remove this when all priorityConfigs implement the
  39. // Map-Reduce pattern.
  40. for i := range priorityConfigs {
  41. if priorityConfigs[i].Function != nil {
  42. wg.Add(1)
  43. go func(index int) {
  44. defer wg.Done()
  45. var err error
  46. // ljs:求出每个节点在配置index上的得分
  47. results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
  48. if err != nil {
  49. appendError(err)
  50. }
  51. }(i)
  52. } else {
  53. results[i] = make(schedulerapi.HostPriorityList, len(nodes))
  54. }
  55. }
  56. // 这里的index是node的序号,和上面的index不同,上面的index是指priorityConfigs的序号
  57. workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
  58. nodeInfo := nodeNameToInfo[nodes[index].Name]
  59. for i := range priorityConfigs {
  60. // 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;
  61. if priorityConfigs[i].Function != nil {
  62. // 因为前面old已经运行过了,也就是priorityConfigs[i].Function
  63. // 这里是两种计算result的方法,选择其中一种就行,Function是old,map是新方法
  64. continue
  65. }
  66. var err error
  67. results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
  68. if err != nil {
  69. appendError(err)
  70. results[i][index].Host = nodes[index].Name
  71. }
  72. }
  73. })
  74. for i := range priorityConfigs {
  75. if priorityConfigs[i].Reduce == nil {
  76. continue
  77. }
  78. wg.Add(1)
  79. go func(index int) {
  80. defer wg.Done()
  81. if err := priorityConfigs[index].Reduce(pod, meta, nodeNameToInfo, results[index]); err != nil {
  82. appendError(err)
  83. }
  84. if klog.V(10) {
  85. for _, hostPriority := range results[index] {
  86. klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Host, priorityConfigs[index].Name, hostPriority.Score)
  87. }
  88. }
  89. }(i)
  90. }
  91. // Wait for all computations to be finished.
  92. wg.Wait()
  93. if len(errs) != 0 {
  94. return schedulerapi.HostPriorityList{}, errors.NewAggregate(errs)
  95. }
  96. // Summarize all scores.
  97. //result用于存储每个node的Score,注意区别result和results
  98. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
  99. for i := range nodes {
  100. // 初始化节点i的得分score为0
  101. result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
  102. for j := range priorityConfigs {
  103. // 遍历所有优先级算法,每个算法有一个加权得分,累加就可以得到节点i的最终得分
  104. result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
  105. }
  106. }
  107. if len(extenders) != 0 && nodes != nil {
  108. combinedScores := make(map[string]int, len(nodeNameToInfo))
  109. for i := range extenders {
  110. if !extenders[i].IsInterested(pod) {
  111. continue
  112. }
  113. wg.Add(1)
  114. go func(extIndex int) {
  115. defer wg.Done()
  116. prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
  117. if err != nil {
  118. // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
  119. return
  120. }
  121. mu.Lock()
  122. for i := range *prioritizedList {
  123. host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
  124. if klog.V(10) {
  125. klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, extenders[extIndex].Name(), score)
  126. }
  127. combinedScores[host] += score * weight
  128. }
  129. mu.Unlock()
  130. }(i)
  131. }
  132. // wait for all go routines to finish
  133. wg.Wait()
  134. for i := range result {
  135. result[i].Score += combinedScores[result[i].Host]
  136. }
  137. }
  138. if klog.V(10) {
  139. for i := range result {
  140. klog.Infof("Host %s => Score %d", result[i].Host, result[i].Score)
  141. }
  142. }
  143. return result, nil
  144. }

这段代码有两段代码感觉是重复了,代码如下:

  1. 代码1
  2. // DEPRECATED: we can remove this when all priorityConfigs implement the
  3. // Map-Reduce pattern.
  4. for i := range priorityConfigs {
  5. if priorityConfigs[i].Function != nil {
  6. wg.Add(1)
  7. go func(index int) {
  8. 。。。
  9. // ljs:求出每个节点在配置index上的得分
  10. results[index], err = priorityConfigs[index].Function(pod, nodeNameToInfo, nodes)
  11. 。。。
  12. }(i)
  13. } else {
  14. results[i] = make(schedulerapi.HostPriorityList, len(nodes))
  15. }
  16. }
  17. // 代码2
  18. workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
  19. nodeInfo := nodeNameToInfo[nodes[index].Name]
  20. for i := range priorityConfigs {
  21. // 这个for循环遍历的是所有的优选配置,如果有老Fun就跳过,新逻辑就继续;
  22. if priorityConfigs[i].Function != nil {
  23. // 因为前面old已经运行过了,也就是priorityConfigs[i].Function
  24. // 这里是两种计算result的方法,选择其中一种就行,Function是old,map是新方法
  25. continue
  26. }
  27. var err error
  28. results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
  29. if err != nil {
  30. appendError(err)
  31. results[i][index].Host = nodes[index].Name
  32. }
  33. }
  34. })
  35. // 分割—————————— type PriorityConfig struct {
  36. Name string
  37. Map PriorityMapFunction
  38. Reduce PriorityReduceFunction
  39. // TODO: Remove it after migrating all functions to
  40. // Map-Reduce pattern.
  41. Function PriorityFunction
  42. Weight int
  43. }

上述两段代码,其实做的是同一件事,就是遍历每个优先级算法函数func,计算每个节点在这个func上的得分。只是使用的方式不一样,我们也可以从PriorityConfig的定义可以看出来,PriorityConfig.Map会取代PriorityFunction。

优先调度算法实例

优选调度算法默认代码在pkg/scheduler/algorithmprovider/defaults/defaults.go下,defaultPriorities方法返回的是默认的一系列优选算法,通过工厂模式处理相应的优选算法,代码如下:

  1. func defaultPriorities() sets.String {
  2. return sets.NewString(
  3. // spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
  4. factory.RegisterPriorityConfigFactory(
  5. "SelectorSpreadPriority",
  6. factory.PriorityConfigFactory{
  7. MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
  8. return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
  9. },
  10. Weight: 1,
  11. },
  12. ),
  13. // pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
  14. // as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
  15. factory.RegisterPriorityConfigFactory(
  16. "InterPodAffinityPriority",
  17. factory.PriorityConfigFactory{
  18. Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
  19. return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
  20. },
  21. Weight: 1,
  22. },
  23. ),
  24. // Prioritize nodes by least requested utilization.
  25. factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),
  26. // Prioritizes nodes to help achieve balanced resource usage
  27. factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),
  28. // Set this weight large enough to override all other priority functions.
  29. // TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
  30. factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),
  31. // Prioritizes nodes that have labels matching NodeAffinity
  32. factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),
  33. // Prioritizes nodes that marked with taint which pod can tolerate.
  34. factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),
  35. // ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
  36. factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
  37. )
  38. }

Function和Map-Reduce实例分析

InterPodAffinityPriority(Function)

这个算法做的是Pod间亲和性优选,也就是亲和pod越多的节点分值越高,反亲和pod越多分值越低。我们撇开具体的亲和性计算规则,从优选函数的形式上看一下这段代码的逻辑:

  1. // 代码位置: pkg/scheduler/algorithm/priorities/interpod_affinity.go:119func (ipa *InterPodAffinity) CalculateInterPodAffinityPriority(pod *v1.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, nodes []*v1.Node) (schedulerapi.HostPriorityList, error) {
  2. affinity := pod.Spec.Affinity
  3. // 是否有亲和性约束;
  4. hasAffinityConstraints := affinity != nil && affinity.PodAffinity != nil
  5. // 是否有反亲和性约束;
  6. hasAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil
  7. // 这里有一段根据亲和性和反亲和性来计算一个node上匹配的pod数量的逻辑,我们先跳过这些逻辑,从优选算法实现的角度看这个算法的架子;
  8. // 当遍历完所有的node之后,可以得到1个最高分和1个最低分,分别记为maxCount和minCount;
  9. for _, node := range nodes {
  10. if pm.counts[node.Name] > maxCount {
  11. maxCount = pm.counts[node.Name]
  12. }
  13. if pm.counts[node.Name] < minCount {
  14. minCount = pm.counts[node.Name]
  15. }
  16. }
  17. // 这个result类型和前面看到的一样,都是存储单个算法的计算结果的;
  18. result := make(schedulerapi.HostPriorityList, 0, len(nodes))
  19. for _, node := range nodes {
  20. fScore := float64(0)
  21. // 如果分差大于0,也就是说不是所有的node都一样的情况,需要对分值做一个处理;
  22. if (maxCount - minCount) > 0 {
  23. // MaxPriority定义的是优选最高分10,第二个因数是当前node的count-最小count
  24. // 然后除以(maxCount - minCount);举个例子,当前node的计算结果是5,最大count20,最小
  25. // count是-3,那么这里就是10*[5-(-3)/20-(-3)]
  26. // 这个计算的结果显然会在[0-10]之间;
  27. fScore = float64(schedulerapi.MaxPriority) * ((pm.counts[node.Name] - minCount) / (maxCount - minCount))
  28. }
  29. // 如果分差不大于0,这时候int(fScore)也就是0,对于各个node的结果都是0
  30. result = append(result, schedulerapi.HostPriority{Host: node.Name, Score: int(fScore)})
  31. }
  32. return result, nil}

如上,我们可以发现最终这个函数计算出了每个node的分值,这个分值在[0-10]之间。所以说到底Function做的事情就是根据一定的规则给每个node赋一个分值,这个分值要求在[0-10]之间,然后把这个HostPriorityList返回就行。

CalculateNodeAffinityPriorityMap(Map)

这个算法和上一个类似,上一个是Pod的Affinity,这个是Node的Affinity;我们来看代码:

  1. // 代码位置:pkg/scheduler/algorithm/priorities/node_affinity.go:34func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
  2. node := nodeInfo.Node()
  3. if node == nil {
  4. return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
  5. }
  6. // default is the podspec.
  7. affinity := pod.Spec.Affinity
  8. if priorityMeta, ok := meta.(*priorityMetadata); ok {
  9. // We were able to parse metadata, use affinity from there.
  10. affinity = priorityMeta.affinity
  11. }
  12. var count int32
  13. if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
  14. // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
  15. for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
  16. preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
  17. if preferredSchedulingTerm.Weight == 0 {
  18. continue
  19. }
  20. nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
  21. if err != nil {
  22. return schedulerapi.HostPriority{}, err
  23. }
  24. if nodeSelector.Matches(labels.Set(node.Labels)) {
  25. count += preferredSchedulingTerm.Weight
  26. }
  27. }
  28. }
  29. return schedulerapi.HostPriority{
  30. Host: node.Name,
  31. Score: int(count),
  32. }, nil}

撇开具体的亲和性计算细节,我们可以发现这个的count没有特定的规则,可能会加到10以上;另外这里的返回值是HostPriority类型,前面的Function返回了HostPriorityList类型。

map函数

  1. pkg/scheduler/algorithm/priorities/selector_spreading.go:221func (s *ServiceAntiAffinity) CalculateAntiAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
  2. var firstServiceSelector labels.Selector
  3. node := nodeInfo.Node()
  4. if node == nil {
  5. return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
  6. }
  7. priorityMeta, ok := meta.(*priorityMetadata)
  8. if ok {
  9. firstServiceSelector = priorityMeta.podFirstServiceSelector
  10. } else {
  11. firstServiceSelector = getFirstServiceSelector(pod, s.serviceLister)
  12. }
  13. // 查找给定node在给定namespace下符合selector的pod,返回值是[]*v1.Pod
  14. matchedPodsOfNode := filteredPod(pod.Namespace, firstServiceSelector, nodeInfo)
  15. return schedulerapi.HostPriority{
  16. Host: node.Name,
  17. // 返回值中Score设置成上面找到的pod的数量
  18. Score: int(len(matchedPodsOfNode)),
  19. }, nil}

这个函数比较短,可以看到在指定node上查询到匹配selector的pod越多,分值就越高。假设找到了20个,那么这里的分值就是20;假设找到的是2,那这里的分值就是2.

CalculateNodeAffinityPriorityReduce(Reduce)

和上面这个Map对应的Reduce函数其实没有单独实现,通过NormalizeReduce函数做了一个通用的Reduce处理:

  1. pkg/scheduler/algorithm/priorities/node_affinity.go:77var CalculateNodeAffinityPriorityReduce = NormalizeReduce(schedulerapi.MaxPriority, false)pkg/scheduler/algorithm/priorities/reduce.go:29func NormalizeReduce(maxPriority int, reverse bool) algorithm.PriorityReduceFunction {
  2. return func(
  3. _ *v1.Pod,
  4. _ interface{},
  5. _ map[string]*schedulercache.NodeInfo,
  6. // 注意到这个result是HostPriorityList,对应1个算法N个node的结果集
  7. result schedulerapi.HostPriorityList) error {
  8. var maxCount int
  9. // 遍历result将最高的Score赋值给maxCount;
  10. for i := range result {
  11. if result[i].Score > maxCount {
  12. maxCount = result[i].Score
  13. }
  14. }
  15. if maxCount == 0 {
  16. if reverse {
  17. for i := range result {
  18. result[i].Score = maxPriority
  19. }
  20. }
  21. return nil
  22. }
  23. for i := range result {
  24. score := result[i].Score
  25. // 举个例子:10*(5/20)
  26. score = maxPriority * score / maxCount
  27. if reverse {
  28. // 如果score是3,得到7;如果score是4,得到6,结果反转;
  29. score = maxPriority - score
  30. }
  31. result[i].Score = score
  32. }
  33. return nil
  34. }}

map-reduce小节

  • Function:一个算法一次性计算出所有node的Score,这个Score的范围是规定的[0-10];
  • Map-Reduce:一个Map算法计算1个node的Score,这个Score可以灵活处理,可能是20,可能是-3;Map过程并发进行;最终得到的结果result通过Reduce归约,将这个算法对应的所有node的分值归约为[0-10];

抢占调度

sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt
详见k8s源码分析--kube-scheduler源码(二)

参考

https://juejin.im/post/5c889c2e5188257df700a732

https://www.kubernetes.org.cn/5122.html

https://www.cnblogs.com/cloudgeek/p/10561221.html

https://www.kubernetes.org.cn/5221.html

http://tang.love/2018/07/24/learning-kubernetes-source-code/

https://www.huweihuang.com/article/source-analysis/kube-scheduler/registerAlgorithmProvider/#1-applyfeaturegates

https://my.oschina.net/u/3797264/blog/2615842

https://my.oschina.net/jxcdwangtao/blog/1594348

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/563721
推荐阅读
相关标签
  

闽ICP备14008679号