赞
踩
Pod 有了 priority(优先级) 后才有优先级调度、抢占调度的说法,高优先级的 pod 可以在调度队列中排到前面,优先选择 node;另外当高优先级的 pod 找不到合适的 node 时,就会看 node 上低优先级的 pod 驱逐之后是否能够 run 起来,如果可以,那么 node 上的一个或多个低优先级的 pod 会被驱逐,然后高优先级的 pod 得以成功运行1个 node 上。今天我们分析 pod 抢占相关的代码。开始之前我们看一下和 priority 相关的2个示例配置文件:
PriorityClass 例子
apiVersion: scheduling.k8s.io/v1kind: PriorityClassmetadata: name: high-priorityvalue: 1000000globalDefault: falsedescription: "This priority class should be used for XYZ service pods only."
使用上述 PriorityClass
apiVersion: v1kind: Podmetadata: name: nginx labels: env: testspec: containers: - name: nginx image: nginx imagePullPolicy: IfNotPresent priorityClassName: high-priority
这两个文件的内容这里不解释,Pod priority 相关知识点不熟悉的小伙伴请先查阅官方文档,我们下面看调度器中和 preempt 相关的代码逻辑。
官网地址:https://kubernetes.io/docs/concepts/configuration/pod-priority-preemption/
代码调用:
sched.scheduleOne -> sched.preempt -> func (g *genericScheduler) Preempt
当通过正常的调度流程如果没有找到合适的节点(主要是预选没有合适的节点),会判断需不需要进行抢占调度,具体的代码在pkg/scheduler/scheduler.go文件下,用到的方法preempt。
- func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {
- // 特性没有开启就返回 ""
- if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
- return "", nil
- }
- // 更新 pod 信息;入参和返回值都是 *v1.Pod 类型
- preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
-
- // preempt 过程,下文分析
- node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
-
- var nodeName = ""
- if node != nil {
- nodeName = node.Name
- // 更新队列中“任命pod”队列
- sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
-
- // 设置pod的Status.NominatedNodeName
- err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
- if err != nil {
- // 如果出错就从 queue 中移除
- sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
- return "", err
- }
-
- for _, victim := range victims {
- // 将要驱逐的 pod 驱逐
- if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
- return "", err
- }
- sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
- }
- }
- // Clearing nominated pods should happen outside of "if node != nil".
- // 这个清理过程在上面的if外部,我们回头从 Preempt() 的实现去理解
- for _, p := range nominatedPodsToClear {
- rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
- if rErr != nil {
- klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
- // We do not return as this error is not critical.
- }
- }
- return nodeName, err
- }
前面看完了正常调度,再来看看Scheduler.scheduleOne方法中,如果预选/优选失败以后的PodPriority优先级调度是如何处理的,PodPriority优先级调度对应启动的方法为sched.preempt(pod, fitError),Scheduler.preempt方法中是优先级调度的逻辑。
上面 preempt() 函数中涉及到了一些值得深入看看的对象,下面我们逐个看一下这些对象的实现。
- func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
- // Scheduler may return various types of errors. Consider preemption only if
- // the error is of type FitError.
-
- //检查error是不是预选失败的error(因为优选只是选择更优的,所以只会是预选失败)
- fitError, ok := scheduleErr.(*FitError)
- if !ok || fitError == nil {
- return nil, nil, nil, nil
- }
-
- //检查cachedNodeInfoMap是否有比将要执行优先调度pod的Priority更小的pod
- if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
- klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
- return nil, nil, nil, nil
- }
- allNodes, err := nodeLister.List()
- if err != nil {
- return nil, nil, nil, err
- }
- if len(allNodes) == 0 {
- return nil, nil, nil, ErrNoNodesAvailable
- }
- // 1.获取预选调度失败的节点,但是可能是潜在的抢占可能成功的节点(所有的抢占节点都是在潜在节点内部选择)
- potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
- if len(potentialNodes) == 0 {
- klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
- // In this case, we should clean-up any existing nominated node name of the pod.
- return nil, nil, []*v1.Pod{pod}, nil
- }
- // 2.获取PDB(Pod中断预算)列表
- // ljs:部署在Kubernetes的每个App都可以创建一个对应PDB Object,// 用来限制Voluntary Disruptions时最大可以down的副本数或者最少应该保持Available的副本数,以此来保证应用的高可用。
- pdbs, err := g.pdbLister.List(labels.Everything())
- if err != nil {
- return nil, nil, nil, err
- }
- // 3.获取所有可以进行Preempt的Node节点的信息,主要包含该节点哪些Pod需要被抢占掉
- nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
- g.predicateMetaProducer, g.schedulingQueue, pdbs)
- if err != nil {
- return nil, nil, nil, err
- }
-
- // We will only check nodeToVictims with extenders that support preemption.
- // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
- // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
- // 4.扩展的Preempt调度判断
- nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
- if err != nil {
- return nil, nil, nil, err
- }
-
- // 5.选中某一个Node
- candidateNode := pickOneNodeForPreemption(nodeToVictims)
- if candidateNode == nil {
- return nil, nil, nil, err
- }
-
- // Lower priority pods nominated to run on this node, may no longer fit on
- // this node. So, we should remove their nomination. Removing their
- // nomination updates these pods and moves them to the active queue. It
- // lets scheduler find another place for them.
- // 6.判断哪些Pod优先级较低,后续需要被清除掉,不作为NominatedPods存在
- nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
- if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
- return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
- }
-
- return nil, nil, nil, fmt.Errorf(
- "preemption failed: the target node %s has been deleted from scheduler cache",
- candidateNode.Name)
- }
Preempt方法主要执行以下几个步骤:
1、从预选失败的节点中获取可以用来做抢占调度的节点,通过一个switch语句排除不可以用来做抢占调度的节点
2、获取PDB(Pod中断预算)列表,用来做后续的判断标准;部署在Kubernetes的每个App都可以创建一个对应PDB Object,用来限制Voluntary Disruptions时最大可以down的副本数或者最少应该保持Available的副本数,以此来保证应用的高可用。
3、通过调用selectNodesForPreemption方法,判断哪些Node可以进行抢占调度。通过ParallelizeUntil方法同步对所有的Node进行判断,判断路径为checkNode-->selectVictimsOnNode-->podFitsOnNode,最终同预选方法类似,使用了podFitsOnNode方法。不同于普通预选,抢占调度会先对Pod优先级判断,然后在移除掉优先级较低的Pod之后再调用podFitsOnNode方法,以此达到抢占的效果。selectNodesForPreemption方法返回的参数是一个map类型的值,key为Node信息,value为该Node如果作为调度节点,将要清除的一些信息,包括Pods和PDB信息
4、获取到抢占调度可以实现的Nodes资源后,继续通过扩展的算法进行过滤;
5、选中最终的抢占调度的Node,调用pickOneNodeForPreemption方法,主要基于5个原则:
a)PDB violations(违规)值最小的Node;
b)挑选具有最低优先级受害者的节点,即被清除的Node上的Pods,它的优先级是最低的;
c)通过所有受害者Pods(将被删除的低优先级Pods)的优先级总和做区分;
d)如果多个Node优先级总和仍然相等,则选择具有最小受害者数量的Node;
e)如果多个Node优先级总和仍然相等,则选择第一个这样的Node(随机排序);
6、选中最终的Node之后,记录该Node上优先级较低的NominatedPods,这些Pod还未调度,需要将其调度关系进行删除,重新应用。
genericScheduler.Preempt
|-->nodesWherePreemptionMightHelp
|-->selectNodesForPreemption
|-->selectVictimsOnNode
|-->filterPodsWithPDBViolation
|-->pickOneNodeForPreemption
|-->getLowerPriorityNominatedPods
nodesWherePreemptionMightHelp 要做的事情是寻找 predicates 阶段失败但是通过抢占也许能够调度成功的 nodes.
- func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
- // 潜力 node, 用于存储返回值的 slice
- potentialNodes := []*v1.Node{}
- for _, node := range nodes {
- // 这个为 true 表示一个 node 驱逐 pod 也不一定能适合当前 pod 运行
- unresolvableReasonExist := false
- // 一个 node 对应的所有失败的 predicates
- failedPredicates, _ := failedPredicatesMap[node.Name]
- // 遍历,看是不是再下面指定的这些原因中,如果在,就标记 unresolvableReasonExist = true
- for _, failedPredicate := range failedPredicates {
- switch failedPredicate {
- case
- predicates.ErrNodeSelectorNotMatch,
- predicates.ErrPodAffinityRulesNotMatch,
- predicates.ErrPodNotMatchHostName,
- predicates.ErrTaintsTolerationsNotMatch,
- predicates.ErrNodeLabelPresenceViolated,
- predicates.ErrNodeNotReady,
- predicates.ErrNodeNetworkUnavailable,
- predicates.ErrNodeUnderDiskPressure,
- predicates.ErrNodeUnderPIDPressure,
- predicates.ErrNodeUnderMemoryPressure,
- predicates.ErrNodeOutOfDisk,
- predicates.ErrNodeUnschedulable,
- predicates.ErrNodeUnknownCondition,
- predicates.ErrVolumeZoneConflict,
- predicates.ErrVolumeNodeConflict,
- predicates.ErrVolumeBindConflict:
- unresolvableReasonExist = true
- // 如果找到一个上述失败原因,说明这个 node 已经可以排除了,break 后继续下一个 node 的计算
- break
- }
- }
- // false 的时候,也就是这个 node 也许驱逐 pods 后有用,那就添加到 potentialNodes 中
- if !unresolvableReasonExist {
- klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
- potentialNodes = append(potentialNodes, node)
- }
- }
- return potentialNodes
- }
这个函数会并发计算所有的 nodes 是否通过驱逐实现 pod 抢占。
- func selectNodesForPreemption(pod *v1.Pod,
- nodeNameToInfo map[string]*schedulercache.NodeInfo,
- potentialNodes []*v1.Node, // 上一个函数计算出来的 nodes
- predicates map[string]algorithm.FitPredicate,
- metadataProducer algorithm.PredicateMetadataProducer,
- queue internalqueue.SchedulingQueue, // 这里其实是前面讲的优先级队列 PriorityQueue
- pdbs []*policy.PodDisruptionBudget, // pdb 列表) (map[*v1.Node]*schedulerapi.Victims, error) {
- nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
- var resultLock sync.Mutex
-
- // We can use the same metadata producer for all nodes.
- meta := metadataProducer(pod, nodeNameToInfo)
- // 这种形式的并发已经不陌生了,前面遇到过几次了
- checkNode := func(i int) {
- nodeName := potentialNodes[i].Name
- var metaCopy algorithm.PredicateMetadata
- if meta != nil {
- metaCopy = meta.ShallowCopy()
- }
- // 这里有一个子过程调用,下面单独介绍
- pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs)
- if fits {
- resultLock.Lock()
- victims := schedulerapi.Victims{
- Pods: pods,
- NumPDBViolations: numPDBViolations,
- }
- // 如果 fit,就添加到 nodeToVictims 中,也就是最后的返回值
- nodeToVictims[potentialNodes[i]] = &victims
- resultLock.Unlock()
- }
- }
- workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
- return nodeToVictims, nil}
这个函数尝试在给定的 node 中寻找最少数量的需要被驱逐的 pods,同时需要保证驱逐了这些 pods 之后,这个 noode 能够满足“pod”运行需求。
- func selectVictimsOnNode(
- pod *v1.Pod,
- meta algorithm.PredicateMetadata,
- nodeInfo *schedulercache.NodeInfo,
- fitPredicates map[string]algorithm.FitPredicate,
- queue internalqueue.SchedulingQueue,
- pdbs []*policy.PodDisruptionBudget,) ([]*v1.Pod, int, bool) {
- if nodeInfo == nil {
- return nil, 0, false
- }
- // 排个序
- potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod}
- nodeInfoCopy := nodeInfo.Clone()
-
- // 定义删除 pod 函数
- removePod := func(rp *v1.Pod) {
- nodeInfoCopy.RemovePod(rp)
- if meta != nil {
- meta.RemovePod(rp)
- }
- }
- // 定义添加 pod 函数
- addPod := func(ap *v1.Pod) {
- nodeInfoCopy.AddPod(ap)
- if meta != nil {
- meta.AddPod(ap, nodeInfoCopy)
- }
- }
- // 删除所有的低优先级 pod 看是不是能够满足调度需求了
- podPriority := util.GetPodPriority(pod)
- for _, p := range nodeInfoCopy.Pods() {
- if util.GetPodPriority(p) < podPriority {
- // 删除的意思其实就是添加元素到 potentialVictims.Items
- potentialVictims.Items = append(potentialVictims.Items, p)
- removePod(p)
- }
- }
- // 排个序
- potentialVictims.Sort()
- // 如果删除了所有的低优先级 pods 之后还不能跑这个新 pod,那么差不多就可以判断这个 node 不适合 preemption 了,还有一点点需要考虑的是这个“pod”的不 fit 的原因是由于 pod affinity 不满足了。
- // 后续可能会增加当前 pod 和低优先级 pod 之间的 优先级检查。
-
- // 这个函数调用其实就是之前讲到过的预选函数的调用逻辑,判断这个 pod 是否合适跑在这个 node 上。
- if fits, _, err := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil); !fits {
- if err != nil {
- klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
- }
- return nil, 0, false
- }
- var victims []*v1.Pod
- numViolatingVictim := 0
-
- // 对前两步删除pod,检查是否会导致pod的数量小于pdb的min-avilable,分为会和不会两类
- violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)
- // 释放 pods 的函数,来一个放一个
- reprievePod := func(p *v1.Pod) bool {
- addPod(p)
- fits, _, _ := podFitsOnNode(pod, meta, nodeInfoCopy, fitPredicates, nil, queue, false, nil)
- if !fits {
- removePod(p)
- victims = append(victims, p)
- klog.V(5).Infof("Pod %v is a potential preemption victim on node %v.", p.Name, nodeInfo.Node().Name)
- }
- return fits
- }
- // 尝试尽量多地释放这些 pods,也就是说能少杀就少杀;这里先从 PDB violating victims 中释放,再从 PDB non-violating victims 中释放;两个组都是从高优先级的 pod 开始释放。
- // 释放 violatingVictims 中元素的同时会记录放了多少个
- for _, p := range violatingVictims {
- if !reprievePod(p) {
- numViolatingVictim++
- }
- }
- // 开始释放 non-violating victims.
- for _, p := range nonViolatingVictims {
- reprievePod(p)
- }
- return victims, numViolatingVictim, true}
selectVictimsOnNode选出node给优先调度pod腾出资源要需要删除的pod(最少数量的),以及对删除pod而导致pod少于pdb定义进行计数。流程如下:
- func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node {
- if len(nodesToVictims) == 0 {
- return nil
- }
- // 初始化为最大值
- minNumPDBViolatingPods := math.MaxInt32
- var minNodes1 []*v1.Node
- lenNodes1 := 0
- // 这个循环要找到 PDBViolatingPods 最少的 node,如果有多个,就全部存在 minNodes1 中
- for node, victims := range nodesToVictims {
- if len(victims.Pods) == 0 {
- // 如果发现一个不需要驱逐 pod 的 node,马上返回
- return node
- }
- numPDBViolatingPods := victims.NumPDBViolations
- if numPDBViolatingPods < minNumPDBViolatingPods {
- minNumPDBViolatingPods = numPDBViolatingPods
- minNodes1 = nil
- lenNodes1 = 0
- }
- if numPDBViolatingPods == minNumPDBViolatingPods {
- minNodes1 = append(minNodes1, node)
- lenNodes1++
- }
- }
- // 如果只找到1个 PDB violations 最少的 node,那就直接返回这个 node 就 ok 了
- if lenNodes1 == 1 {
- return minNodes1[0]
- }
-
- // 还剩下多个 node,那就寻找 highest priority victim 最小的 node
- minHighestPriority := int32(math.MaxInt32)
- var minNodes2 = make([]*v1.Node, lenNodes1)
- lenNodes2 := 0
- // 这个循环要做的事情是看2个 node 上 victims 中最高优先级的 pod 哪个优先级更高
- for i := 0; i < lenNodes1; i++ {
- node := minNodes1[i]
- victims := nodesToVictims[node]
- // highestPodPriority is the highest priority among the victims on this node.
- highestPodPriority := util.GetPodPriority(victims.Pods[0])
- if highestPodPriority < minHighestPriority {
- minHighestPriority = highestPodPriority
- lenNodes2 = 0
- }
- if highestPodPriority == minHighestPriority {
- minNodes2[lenNodes2] = node
- lenNodes2++
- }
- }
- // 发现只有1个,那就直接返回
- if lenNodes2 == 1 {
- return minNodes2[0]
- }
-
- // 这时候还没有抉择出一个 node,那就开始计算优先级总和了,看哪个更低
- minSumPriorities := int64(math.MaxInt64)
- lenNodes1 = 0
- for i := 0; i < lenNodes2; i++ {
- var sumPriorities int64
- node := minNodes2[i]
- for _, pod := range nodesToVictims[node].Pods {
- // 这里的累加考虑到了先把优先级搞成正数。不然会出现1个 node 上有1优先级为 -3 的 pod,另外一个 node 上有2个优先级为 -3 的 pod,结果 -3>-6,有2个 pod 的 node 反而被认为总优先级更低!
- sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
- }
- if sumPriorities < minSumPriorities {
- minSumPriorities = sumPriorities
- lenNodes1 = 0
- }
- if sumPriorities == minSumPriorities {
- minNodes1[lenNodes1] = node
- lenNodes1++
- }
- }
- if lenNodes1 == 1 {
- return minNodes1[0]
- }
-
- // 还是没有分出胜负,于是开始用 pod 总数做比较
- minNumPods := math.MaxInt32
- lenNodes2 = 0
- for i := 0; i < lenNodes1; i++ {
- node := minNodes1[i]
- numPods := len(nodesToVictims[node].Pods)
- if numPods < minNumPods {
- minNumPods = numPods
- lenNodes2 = 0
- }
- if numPods == minNumPods {
- minNodes2[lenNodes2] = node
- lenNodes2++
- }
- }
- // 还是没有区分出来1个 node 的话,只能放弃区分了,直接返回第一个结果
- if lenNodes2 > 0 {
- return minNodes2[0]
- }
- klog.Errorf("Error in logic of node scoring for preemption. We should never reach here!")
- return nil}
pickOneNodeForPreemption从可以进行优先调度的node中选出最优的一个node。
https://www.kubernetes.org.cn/5221.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。