当前位置:   article > 正文

k8s调度原理以及自定义调度器_scheduling-framework自定义调度器

scheduling-framework自定义调度器

kube-scheduler 是 kubernetes 的核心组件之一,主要负责整个集群资源的调度功能,根据特定的调度算法和策略,将 Pod 调度到最优的工作节点上面去,从而更加合理、更加充分的利用集群的资源,这也是我们选择使用 kubernetes 一个非常重要的理由。如果一门新的技术不能帮助企业节约成本、提供效率,我相信是很难推进的。

1. 调度流程

默认情况下,kube-scheduler 提供的默认调度器能够满足我们绝大多数的要求,我们前面和大家接触的示例也基本上用的默认的策略,都可以保证我们的 Pod 可以被分配到资源充足的节点上运行。但是在实际的线上项目中,可能我们自己会比 kubernetes 更加了解我们自己的应用,比如我们希望一个 Pod 只能运行在特定的几个节点上,或者这几个节点只能用来运行特定类型的应用,这就需要我们的调度器能够可控。

kube-scheduler 的主要作用就是根据特定的调度算法和调度策略将 Pod 调度到合适的 Node 节点上去,是一个独立的二进制程序,启动之后会一直监听 API Server,获取到 PodSpec.NodeName 为空的 Pod,对每个 Pod 都会创建一个 binding。

这个过程在我们看来好像比较简单,但在实际的生产环境中,需要考虑的问题就有很多了:

  • 如何保证全部的节点调度的公平性?要知道并不是所有节点资源配置一定都是一样的

  • 如何保证每个节点都能被分配资源?

  • 集群资源如何能够被高效利用?

  • 集群资源如何才能被最大化使用?

  • 如何保证 Pod 调度的性能和效率?

  • 用户是否可以根据自己的实际需求定制自己的调度策略?

考虑到实际环境中的各种复杂情况,kubernetes 的调度器采用插件化的形式实现,可以方便用户进行定制或者二次开发,我们可以自定义一个调度器并以插件形式和 kubernetes 进行集成。

kubernetes 调度器的源码位于 kubernetes/pkg/scheduler 中,大体的代码目录结构如下所示:(不同的版本目录结构可能不太一样)

  1. kubernetes/pkg/scheduler
  2. -- scheduler.go //调度相关的具体实现
  3. |-- algorithm
  4. | |-- predicates //节点筛选策略
  5. | |-- priorities //节点打分策略
  6. |-- algorithmprovider
  7. | |-- defaults //定义默认的调度器

2. 自定义调度器说明

一般来说,我们有 4 种扩展 Kubernetes 调度器的方法。

  • 一种方法就是直接 clone 官方的 kube-scheduler 源代码,在合适的位置直接修改代码,然后重新编译运行修改后的程序,当然这种方法是最不建议使用的,也不实用,因为需要花费大量额外的精力来和上游的调度程序更改保持一致。

  • 第二种方法就是和默认的调度程序一起运行独立的调度程序,默认的调度器和我们自定义的调度器可以通过 Pod 的 spec.schedulerName 来覆盖各自的 Pod,默认是使用 default 默认的调度器,但是多个调度程序共存的情况下也比较麻烦,比如当多个调度器将 Pod 调度到同一个节点的时候,可能会遇到一些问题,因为很有可能两个调度器都同时将两个 Pod 调度到同一个节点上去,但是很有可能其中一个 Pod 运行后其实资源就消耗完了,并且维护一个高质量的自定义调度程序也不是很容易的,因为我们需要全面了解默认的调度程序,整体 Kubernetes 的架构知识以及各种 Kubernetes API 对象的各种关系或限制。

  • 第三种方法是调度器扩展程序,这个方案目前是一个可行的方案,可以和上游调度程序兼容,所谓的调度器扩展程序其实就是一个可配置的 Webhook 而已,里面包含 过滤器优先级 两个端点,分别对应调度周期中的两个主要阶段(过滤和打分)。

  • 第四种方法是通过调度框架(Scheduling Framework),Kubernetes v1.15 版本中引入了可插拔架构的调度框架,使得定制调度器这个任务变得更加的容易。调库框架向现有的调度器中添加了一组插件化的 API,该 API 在保持调度程序“核心”简单且易于维护的同时,使得大部分的调度功能以插件的形式存在,而且在我们现在的 v1.16 版本中上面的 调度器扩展程序 也已经被废弃了,所以以后调度框架才是自定义调度器的核心方式。

这里我们可以简单介绍下后面两种方式的实现。

3. 调度器扩展程序

在进入调度器扩展程序之前,我们再来了解下 Kubernetes 调度程序是如何工作的:

  1. 默认调度器根据指定的参数启动(我们使用 kubeadm 搭建的集群,启动配置文件位于 /etc/kubernetes/manifests/kube-schdueler.yaml

  2. watch apiserver,将 spec.nodeName 为空的 Pod 放入调度器内部的调度队列中

  3. 从调度队列中 Pop 出一个 Pod,开始一个标准的调度周期

  4. 从 Pod 属性中检索“硬性要求”(比如 CPU/内存请求值,nodeSelector/nodeAffinity),然后过滤阶段发生,在该阶段计算出满足要求的节点候选列表

  5. 从 Pod 属性中检索“软需求”,并应用一些默认的“软策略”(比如 Pod 倾向于在节点上更加聚拢或分散),最后,它为每个候选节点给出一个分数,并挑选出得分最高的最终获胜者

  6. 和 apiserver 通信(发送绑定调用),然后设置 Pod 的 spec.nodeName 属性以表示将该 Pod 调度到的节点。

我们可以通过查看官方文档,可以通过 --config 参数指定调度器将使用哪些参数,该配置文件应该包含一个 KubeSchedulerConfiguration 对象,如下所示格式:(/etc/kubernetes/scheduler-extender.yaml)

  1. # 通过"--config" 传递文件内容
  2. apiVersion: kubescheduler.config.k8s.io/v1alpha1
  3. kind: KubeSchedulerConfiguration
  4. clientConnection:
  5. kubeconfig: "/etc/kubernetes/scheduler.conf"
  6. algorithmSource:
  7. policy:
  8. file:
  9. path: "/etc/kubernetes/scheduler-extender-policy.yaml" # 指定自定义调度策略文件

我们在这里应该输入的关键参数是 algorithmSource.policy,这个策略文件可以是本地文件也可以是 ConfigMap 资源对象,这取决于调度程序的部署方式,比如我们这里默认的调度器是静态 Pod 方式启动的,所以我们可以用本地文件的形式来配置。

该策略文件 /etc/kubernetes/scheduler-extender-policy.yaml 应该遵循 kubernetes/pkg/scheduler/apis/config/legacy_types.go#L28 的要求,在我们这里的 v1.16.2 版本中已经支持 JSON 和 YAML 两种格式的策略文件,下面是我们定义的一个简单的示例,可以查看 Extender 描述了解策略文件的定义规范:

  1. apiVersion: v1
  2. kind: Policy
  3. extenders:
  4. - urlPrefix: "http://127.0.0.1:8888/"
  5. filterVerb: "filter"
  6. prioritizeVerb: "prioritize"
  7. weight: 1
  8. enableHttps: false

我们这里的 Policy 策略文件是通过定义 extenders 来扩展调度器的,有时候我们不需要去编写代码,可以直接在该配置文件中通过指定 predicatespriorities 来进行自定义,如果没有指定则会使用默认的 DefaultProvier

  1. {
  2. "kind": "Policy",
  3. "apiVersion": "v1",
  4. "predicates": [
  5. {
  6. "name": "MatchNodeSelector"
  7. },
  8. {
  9. "name": "PodFitsResources"
  10. },
  11. {
  12. "name": "PodFitsHostPorts"
  13. },
  14. {
  15. "name": "HostName"
  16. }
  17. ],
  18. "priorities": [
  19. {
  20. "name": "EqualPriority",
  21. "weight": 2
  22. },
  23. {
  24. "name": "ImageLocalityPriority",
  25. "weight": 4
  26. },
  27. {
  28. "name": "LeastRequestedPriority",
  29. "weight": 2
  30. },
  31. {
  32. "name": "BalancedResourceAllocation",
  33. "weight": 2
  34. }
  35. ],
  36. "extenders": [
  37. {
  38. "urlPrefix": "/prefix",
  39. "filterVerb": "filter",
  40. "prioritizeVerb": "prioritize",
  41. "weight": 1,
  42. "bindVerb": "bind",
  43. "enableHttps": false
  44. }
  45. ]
  46. }

改策略文件定义了一个 HTTP 的扩展程序服务,该服务运行在 127.0.0.1:8888 下面,并且已经将该策略注册到了默认的调度器中,这样在过滤和打分阶段结束后,可以将结果分别传递给该扩展程序的端点 <urlPrefix>/<filterVerb><urlPrefix>/<prioritizeVerb>,在扩展程序中,我们可以进一步过滤并确定优先级,以适应我们的特定业务需求。

示例

我们直接用 golang 来实现一个简单的调度器扩展程序,当然你可以使用其他任何编程语言,如下所示:

  1. func main() {
  2. router := httprouter.New()
  3. router.GET("/", Index)
  4. router.POST("/filter", Filter)
  5. router.POST("/prioritize", Prioritize)
  6. log.Fatal(http.ListenAndServe(":8888", router))
  7. }

然后接下来我们需要实现 /filter/prioritize 两个端点的处理程序。

其中 Filter 这个扩展函数接收一个输入类型为 schedulerapi.ExtenderArgs 的参数,然后返回一个类型为 *schedulerapi.ExtenderFilterResult 的值。在函数中,我们可以进一步过滤输入的节点:

  1. // filter 根据扩展程序定义的预选规则来过滤节点
  2. func filter(args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult {
  3. var filteredNodes []v1.Node
  4. failedNodes := make(schedulerapi.FailedNodesMap)
  5. pod := args.Pod
  6. for _, node := range args.Nodes.Items {
  7. fits, failReasons, _ := podFitsOnNode(pod, node)
  8. if fits {
  9. filteredNodes = append(filteredNodes, node)
  10. } else {
  11. failedNodes[node.Name] = strings.Join(failReasons, ",")
  12. }
  13. }
  14. result := schedulerapi.ExtenderFilterResult{
  15. Nodes: &v1.NodeList{
  16. Items: filteredNodes,
  17. },
  18. FailedNodes: failedNodes,
  19. Error: "",
  20. }
  21. return &result
  22. }

在过滤函数中,我们循环每个节点然后用我们自己实现的业务逻辑来判断是否应该批准该节点,这里我们实现比较简单,在 podFitsOnNode() 函数中我们只是简单的检查随机数是否为偶数来判断即可,如果是的话我们就认为这是一个幸运的节点,否则拒绝批准该节点。

  1. var predicatesSorted = []string{LuckyPred}
  2. var predicatesFuncs = map[string]FitPredicate{
  3. LuckyPred: LuckyPredicate,
  4. }
  5. type FitPredicate func(pod *v1.Pod, node v1.Node) (bool, []string, error)
  6. func podFitsOnNode(pod *v1.Pod, node v1.Node) (bool, []string, error) {
  7. fits := true
  8. var failReasons []string
  9. for _, predicateKey := range predicatesSorted {
  10. fit, failures, err := predicatesFuncs[predicateKey](pod, node)
  11. if err != nil {
  12. return false, nil, err
  13. }
  14. fits = fits && fit
  15. failReasons = append(failReasons, failures...)
  16. }
  17. return fits, failReasons, nil
  18. }
  19. func LuckyPredicate(pod *v1.Pod, node v1.Node) (bool, []string, error) {
  20. lucky := rand.Intn(2) == 0
  21. if lucky {
  22. log.Printf("pod %v/%v is lucky to fit on node %v\n", pod.Name, pod.Namespace, node.Name)
  23. return true, nil, nil
  24. }
  25. log.Printf("pod %v/%v is unlucky to fit on node %v\n", pod.Name, pod.Namespace, node.Name)
  26. return false, []string{LuckyPredFailMsg}, nil
  27. }

同样的打分功能用同样的方式来实现,我们在每个节点上随机给出一个分数:

  1. // it's webhooked to pkg/scheduler/core/generic_scheduler.go#PrioritizeNodes()
  2. // 这个函数输出的分数会被添加会默认的调度器
  3. func prioritize(args schedulerapi.ExtenderArgs) *schedulerapi.HostPriorityList {
  4. pod := args.Pod
  5. nodes := args.Nodes.Items
  6. hostPriorityList := make(schedulerapi.HostPriorityList, len(nodes))
  7. for i, node := range nodes {
  8. score := rand.Intn(schedulerapi.MaxPriority + 1) // 在最大优先级内随机取一个值
  9. log.Printf(luckyPrioMsg, pod.Name, pod.Namespace, score)
  10. hostPriorityList[i] = schedulerapi.HostPriority{
  11. Host: node.Name,
  12. Score: score,
  13. }
  14. }
  15. return &hostPriorityList
  16. }

然后我们可以使用下面的命令来编译打包我们的应用:

$ GOOS=linux GOARCH=amd64 go build -o app

本节调度器扩展程序完整的代码获取地址:https://github.com/cnych/sample-scheduler-extender。

构建完成后,将应用 app 拷贝到 kube-scheduler 所在的节点直接运行即可。现在我们就可以将上面的策略文件配置到 kube-scheduler 组件中去了,我们这里集群是 kubeadm 搭建的,所以直接修改文件 /etc/kubernetes/manifests/kube-schduler.yaml 文件即可,内容如下所示:

  1. apiVersion: v1
  2. kind: Pod
  3. metadata:
  4. creationTimestamp: null
  5. labels:
  6. component: kube-scheduler
  7. tier: control-plane
  8. name: kube-scheduler
  9. namespace: kube-system
  10. spec:
  11. containers:
  12. - command:
  13. - kube-scheduler
  14. - --authentication-kubeconfig=/etc/kubernetes/scheduler.conf
  15. - --authorization-kubeconfig=/etc/kubernetes/scheduler.conf
  16. - --bind-address=127.0.0.1
  17. - --kubeconfig=/etc/kubernetes/scheduler.conf
  18. - --leader-elect=true
  19. - --config=/etc/kubernetes/scheduler-extender.yaml
  20. - --v=9
  21. image: gcr.azk8s.cn/google_containers/kube-scheduler:v1.16.2
  22. imagePullPolicy: IfNotPresent
  23. livenessProbe:
  24. failureThreshold: 8
  25. httpGet:
  26. host: 127.0.0.1
  27. path: /healthz
  28. port: 10251
  29. scheme: HTTP
  30. initialDelaySeconds: 15
  31. timeoutSeconds: 15
  32. name: kube-scheduler
  33. resources:
  34. requests:
  35. cpu: 100m
  36. volumeMounts:
  37. - mountPath: /etc/kubernetes/scheduler.conf
  38. name: kubeconfig
  39. readOnly: true
  40. - mountPath: /etc/kubernetes/scheduler-extender.yaml
  41. name: extender
  42. readOnly: true
  43. - mountPath: /etc/kubernetes/scheduler-extender-policy.yaml
  44. name: extender-policy
  45. readOnly: true
  46. hostNetwork: true
  47. priorityClassName: system-cluster-critical
  48. volumes:
  49. - hostPath:
  50. path: /etc/kubernetes/scheduler.conf
  51. type: FileOrCreate
  52. name: kubeconfig
  53. - hostPath:
  54. path: /etc/kubernetes/scheduler-extender.yaml
  55. type: FileOrCreate
  56. name: extender
  57. - hostPath:
  58. path: /etc/kubernetes/scheduler-extender-policy.yaml
  59. type: FileOrCreate
  60. name: extender-policy
  61. status: {}

当然我们这个地方是直接在系统默认的 kube-scheduler 上面配置的,我们也可以复制一个调度器的 YAML 文件然后更改下 schedulerName 来部署,这样就不会影响默认的调度器了,然后在需要使用这个测试的调度器的 Pod 上面指定 spec.schedulerName 即可。对于多调度器的使用可以查看官方文档 配置多个调度器

kube-scheduler 重新配置后可以查看日志来验证是否重启成功,需要注意的是一定需要将 /etc/kubernetes/scheduler-extender.yaml/etc/kubernetes/scheduler-extender-policy.yaml 两个文件挂载到 Pod 中去:

  1. $ kubectl logs -f kube-scheduler-ydzs-master -n kube-system
  2. I0102 15:17:38.824657 1 serving.go:319] Generated self-signed cert in-memory
  3. I0102 15:17:39.472276 1 server.go:143] Version: v1.16.2
  4. I0102 15:17:39.472674 1 defaults.go:91] TaintNodesByCondition is enabled, PodToleratesNodeTaints predicate is mandatory
  5. W0102 15:17:39.479704 1 authorization.go:47] Authorization is disabled
  6. W0102 15:17:39.479733 1 authentication.go:79] Authentication is disabled
  7. I0102 15:17:39.479777 1 deprecated_insecure_serving.go:51] Serving healthz insecurely on [::]:10251
  8. I0102 15:17:39.480559 1 secure_serving.go:123] Serving securely on 127.0.0.1:10259
  9. I0102 15:17:39.682180 1 leaderelection.go:241] attempting to acquire leader lease kube-system/kube-scheduler...
  10. I0102 15:17:56.500505 1 leaderelection.go:251] successfully acquired lease kube-system/kube-scheduler

到这里我们就创建并配置了一个非常简单的调度扩展程序,现在我们来运行一个 Deployment 查看其工作原理,我们准备一个包含 20 个副本的部署 Yaml:(test-scheduler.yaml)

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: pause
  5. spec:
  6. replicas: 20
  7. selector:
  8. matchLabels:
  9. app: pause
  10. template:
  11. metadata:
  12. labels:
  13. app: pause
  14. spec:
  15. containers:
  16. - name: pause
  17. image: gcr.azk8s.cn/google_containers/pause:3.1

直接创建上面的资源对象:

  1. $ kuectl apply -f test-scheduler.yaml
  2. deployment.apps/pause created

这个时候我们去查看下我们编写的调度器扩展程序日志:

  1. $ ./app
  2. ......
  3. 2020/01/03 12:27:29 pod pause-58584fbc95-bwn7t/default is unlucky to fit on node ydzs-node1
  4. 2020/01/03 12:27:29 pod pause-58584fbc95-bwn7t/default is lucky to get score 7
  5. 2020/01/03 12:27:29 pod pause-58584fbc95-bwn7t/default is lucky to get score 9
  6. 2020/01/03 12:27:29 pod pause-58584fbc95-86w92/default is unlucky to fit on node ydzs-node3
  7. 2020/01/03 12:27:29 pod pause-58584fbc95-86w92/default is unlucky to fit on node ydzs-node4
  8. 2020/01/03 12:27:29 pod pause-58584fbc95-86w92/default is lucky to fit on node ydzs-node1
  9. 2020/01/03 12:27:29 pod pause-58584fbc95-86w92/default is lucky to fit on node ydzs-node2
  10. 2020/01/03 12:27:29 pod pause-58584fbc95-86w92/default is lucky to get score 4
  11. 2020/01/03 12:27:29 pod pause-58584fbc95-86w92/default is lucky to get score 8
  12. ......

我们可以看到 Pod 调度的过程,另外默认调度程序会定期重试失败的 Pod,因此它们将一次又一次地重新传递到我们的调度扩展程序上,我们的逻辑是检查随机数是否为偶数,所以最终所有 Pod 都将处于运行状态。

调度器扩展程序可能是在一些情况下可以满足我们的需求,但是他仍然有一些限制和缺点:

  • 通信成本:数据在默认调度程序和调度器扩展程序之间以 http(s)传输,在执行序列化和反序列化的时候有一定成本

  • 有限的扩展点:扩展程序只能在某些阶段的末尾参与,例如“ Filter”“ Prioritize”,它们不能在任何阶段的开始或中间被调用

  • 减法优于加法:与默认调度程序传递的节点候选列表相比,我们可能有一些需求需要添加新的候选节点列表,但这是比较冒险的操作,因为不能保证新节点可以通过其他要求,所以,调度器扩展程序最好执行“减法”(进一步过滤),而不是“加法”(添加节点)

  • 缓存共享:上面只是一个简单的测试示例,但在真实的项目中,我们是需要通过查看整个集群的状态来做出调度决策的,默认调度程序可以很好地调度决策,但是无法共享其缓存,这意味着我们必须构建和维护自己的缓存

由于这些局限性,Kubernetes 调度小组就提出了上面第四种方法来进行更好的扩展,也就是调度框架(Scheduler Framework),它基本上可以解决我们遇到的所有难题,现在也已经成官方推荐的扩展方式,所以这将是以后扩展调度器的最主流的方式

4. 调度框架

调度框架定义了一组扩展点,用户可以实现扩展点定义的接口来定义自己的调度逻辑(我们称之为扩展),并将扩展注册到扩展点上,调度框架在执行调度工作流时,遇到对应的扩展点时,将调用用户注册的扩展。调度框架在预留扩展点时,都是有特定的目的,有些扩展点上的扩展可以改变调度程序的决策方法,有些扩展点上的扩展只是发送一个通知。

我们知道每当调度一个 Pod 时,都会按照两个过程来执行:调度过程和绑定过程。

调度过程为 Pod 选择一个合适的节点,绑定过程则将调度过程的决策应用到集群中(也就是在被选定的节点上运行 Pod),将调度过程和绑定过程合在一起,称之为调度上下文(scheduling context)。需要注意的是调度过程是同步运行的(同一时间点只为一个 Pod 进行调度),绑定过程可异步运行(同一时间点可并发为多个 Pod 执行绑定)。

调度过程和绑定过程遇到如下情况时会中途退出:

  • 调度程序认为当前没有该 Pod 的可选节点

  • 内部错误

这个时候,该 Pod 将被放回到 待调度队列,并等待下次重试。

扩展点(Extension Points)

下图展示了调度框架中的调度上下文及其中的扩展点,一个扩展可以注册多个扩展点,以便可以执行更复杂的有状态的任务。

  1. QueueSort 扩展用于对 Pod 的待调度队列进行排序,以决定先调度哪个 Pod,QueueSort 扩展本质上只需要实现一个方法 Less(Pod1, Pod2) 用于比较两个 Pod 谁更优先获得调度即可,同一时间点只能有一个 QueueSort 插件生效。

  2. Pre-filter 扩展用于对 Pod 的信息进行预处理,或者检查一些集群或 Pod 必须满足的前提条件,如果 pre-filter 返回了 error,则调度过程终止。

  3. Filter 扩展用于排除那些不能运行该 Pod 的节点,对于每一个节点,调度器将按顺序执行 filter 扩展;如果任何一个 filter 将节点标记为不可选,则余下的 filter 扩展将不会被执行。调度器可以同时对多个节点执行 filter 扩展。

  4. Post-filter 是一个通知类型的扩展点,调用该扩展的参数是 filter 阶段结束后被筛选为可选节点的节点列表,可以在扩展中使用这些信息更新内部状态,或者产生日志或 metrics 信息。

  5. Scoring 扩展用于为所有可选节点进行打分,调度器将针对每一个节点调用 Soring 扩展,评分结果是一个范围内的整数。在 normalize scoring 阶段,调度器将会把每个 scoring 扩展对具体某个节点的评分结果和该扩展的权重合并起来,作为最终评分结果。

  6. Normalize scoring 扩展在调度器对节点进行最终排序之前修改每个节点的评分结果,注册到该扩展点的扩展在被调用时,将获得同一个插件中的 scoring 扩展的评分结果作为参数,调度框架每执行一次调度,都将调用所有插件中的一个 normalize scoring 扩展一次。

  7. Reserve 是一个通知性质的扩展点,有状态的插件可以使用该扩展点来获得节点上为 Pod 预留的资源,该事件发生在调度器将 Pod 绑定到节点之前,目的是避免调度器在等待 Pod 与节点绑定的过程中调度新的 Pod 到节点上时,发生实际使用资源超出可用资源的情况。(因为绑定 Pod 到节点上是异步发生的)。这是调度过程的最后一个步骤,Pod 进入 reserved 状态以后,要么在绑定失败时触发 Unreserve 扩展,要么在绑定成功时,由 Post-bind 扩展结束绑定过程。

  8. Permit 扩展用于阻止或者延迟 Pod 与节点的绑定。Permit 扩展可以做下面三件事中的一项:

    • approve(批准):当所有的 permit 扩展都 approve 了 Pod 与节点的绑定,调度器将继续执行绑定过程

    • deny(拒绝):如果任何一个 permit 扩展 deny 了 Pod 与节点的绑定,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展

    • wait(等待):如果一个 permit 扩展返回了 wait,则 Pod 将保持在 permit 阶段,直到被其他扩展 approve,如果超时事件发生,wait 状态变成 deny,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展

  9. Pre-bind 扩展用于在 Pod 绑定之前执行某些逻辑。例如,pre-bind 扩展可以将一个基于网络的数据卷挂载到节点上,以便 Pod 可以使用。如果任何一个 pre-bind 扩展返回错误,Pod 将被放回到待调度队列,此时将触发 Unreserve 扩展。

  10. Bind 扩展用于将 Pod 绑定到节点上:

    • 只有所有的 pre-bind 扩展都成功执行了,bind 扩展才会执行

    • 调度框架按照 bind 扩展注册的顺序逐个调用 bind 扩展

    • 具体某个 bind 扩展可以选择处理或者不处理该 Pod

    • 如果某个 bind 扩展处理了该 Pod 与节点的绑定,余下的 bind 扩展将被忽略

  11. Post-bind 是一个通知性质的扩展:

    • Post-bind 扩展在 Pod 成功绑定到节点上之后被动调用

    • Post-bind 扩展是绑定过程的最后一个步骤,可以用来执行资源清理的动作

  12. Unreserve 是一个通知性质的扩展,如果为 Pod 预留了资源,Pod 又在被绑定过程中被拒绝绑定,则 unreserve 扩展将被调用。Unreserve 扩展应该释放已经为 Pod 预留的节点上的计算资源。在一个插件中,reserve 扩展和 unreserve 扩展应该成对出现。

如果我们要实现自己的插件,必须向调度框架注册插件并完成配置,另外还必须实现扩展点接口,对应的扩展点接口我们可以在源码 pkg/scheduler/framework/v1alpha1/interface.go 文件中找到,如下所示:

  1. // Plugin is the parent type for all the scheduling framework plugins.
  2. type Plugin interface {
  3. Name() string
  4. }
  5. type QueueSortPlugin interface {
  6. Plugin
  7. Less(*PodInfo, *PodInfo) bool
  8. }
  9. // PreFilterPlugin is an interface that must be implemented by "prefilter" plugins.
  10. // These plugins are called at the beginning of the scheduling cycle.
  11. type PreFilterPlugin interface {
  12. Plugin
  13. PreFilter(pc *PluginContext, p *v1.Pod) *Status
  14. }
  15. // FilterPlugin is an interface for Filter plugins. These plugins are called at the
  16. // filter extension point for filtering out hosts that cannot run a pod.
  17. // This concept used to be called 'predicate' in the original scheduler.
  18. // These plugins should return "Success", "Unschedulable" or "Error" in Status.code.
  19. // However, the scheduler accepts other valid codes as well.
  20. // Anything other than "Success" will lead to exclusion of the given host from
  21. // running the pod.
  22. type FilterPlugin interface {
  23. Plugin
  24. Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
  25. }
  26. // PostFilterPlugin is an interface for Post-filter plugin. Post-filter is an
  27. // informational extension point. Plugins will be called with a list of nodes
  28. // that passed the filtering phase. A plugin may use this data to update internal
  29. // state or to generate logs/metrics.
  30. type PostFilterPlugin interface {
  31. Plugin
  32. PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
  33. }
  34. // ScorePlugin is an interface that must be implemented by "score" plugins to rank
  35. // nodes that passed the filtering phase.
  36. type ScorePlugin interface {
  37. Plugin
  38. Score(pc *PluginContext, p *v1.Pod, nodeName string) (int, *Status)
  39. }
  40. // ScoreWithNormalizePlugin is an interface that must be implemented by "score"
  41. // plugins that also need to normalize the node scoring results produced by the same
  42. // plugin's "Score" method.
  43. type ScoreWithNormalizePlugin interface {
  44. ScorePlugin
  45. NormalizeScore(pc *PluginContext, p *v1.Pod, scores NodeScoreList) *Status
  46. }
  47. // ReservePlugin is an interface for Reserve plugins. These plugins are called
  48. // at the reservation point. These are meant to update the state of the plugin.
  49. // This concept used to be called 'assume' in the original scheduler.
  50. // These plugins should return only Success or Error in Status.code. However,
  51. // the scheduler accepts other valid codes as well. Anything other than Success
  52. // will lead to rejection of the pod.
  53. type ReservePlugin interface {
  54. Plugin
  55. Reserve(pc *PluginContext, p *v1.Pod, nodeName string) *Status
  56. }
  57. // PreBindPlugin is an interface that must be implemented by "prebind" plugins.
  58. // These plugins are called before a pod being scheduled.
  59. type PreBindPlugin interface {
  60. Plugin
  61. PreBind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
  62. }
  63. // PostBindPlugin is an interface that must be implemented by "postbind" plugins.
  64. // These plugins are called after a pod is successfully bound to a node.
  65. type PostBindPlugin interface {
  66. Plugin
  67. PostBind(pc *PluginContext, p *v1.Pod, nodeName string)
  68. }
  69. // UnreservePlugin is an interface for Unreserve plugins. This is an informational
  70. // extension point. If a pod was reserved and then rejected in a later phase, then
  71. // un-reserve plugins will be notified. Un-reserve plugins should clean up state
  72. // associated with the reserved Pod.
  73. type UnreservePlugin interface {
  74. Plugin
  75. Unreserve(pc *PluginContext, p *v1.Pod, nodeName string)
  76. }
  77. // PermitPlugin is an interface that must be implemented by "permit" plugins.
  78. // These plugins are called before a pod is bound to a node.
  79. type PermitPlugin interface {
  80. Plugin
  81. Permit(pc *PluginContext, p *v1.Pod, nodeName string) (*Status, time.Duration)
  82. }
  83. // BindPlugin is an interface that must be implemented by "bind" plugins. Bind
  84. // plugins are used to bind a pod to a Node.
  85. type BindPlugin interface {
  86. Plugin
  87. Bind(pc *PluginContext, p *v1.Pod, nodeName string) *Status
  88. }

对于调度框架插件的启用或者禁用,我们同样可以使用上面的 KubeSchedulerConfiguration 资源对象来进行配置。下面的例子中的配置启用了一个实现了 reservepreBind 扩展点的插件,并且禁用了另外一个插件,同时为插件 foo 提供了一些配置信息:

  1. apiVersion: kubescheduler.config.k8s.io/v1alpha1
  2. kind: KubeSchedulerConfiguration
  3. ---
  4. plugins:
  5. reserve:
  6. enabled:
  7. - name: foo
  8. - name: bar
  9. disabled:
  10. - name: baz
  11. preBind:
  12. enabled:
  13. - name: foo
  14. disabled:
  15. - name: baz
  16. pluginConfig:
  17. - name: foo
  18. args: >
  19. foo插件可以解析的任意内容

扩展的调用顺序如下:

  • 如果某个扩展点没有配置对应的扩展,调度框架将使用默认插件中的扩展

  • 如果为某个扩展点配置且激活了扩展,则调度框架将先调用默认插件的扩展,再调用配置中的扩展

  • 默认插件的扩展始终被最先调用,然后按照 KubeSchedulerConfiguration 中扩展的激活 enabled 顺序逐个调用扩展点的扩展

  • 可以先禁用默认插件的扩展,然后在 enabled 列表中的某个位置激活默认插件的扩展,这种做法可以改变默认插件的扩展被调用时的顺序

假设默认插件 foo 实现了 reserve 扩展点,此时我们要添加一个插件 bar,想要在 foo 之前被调用,则应该先禁用 foo 再按照 bar foo 的顺序激活。示例配置如下所示:

  1. apiVersion: kubescheduler.config.k8s.io/v1alpha1
  2. kind: KubeSchedulerConfiguration
  3. ---
  4. plugins:
  5. reserve:
  6. enabled:
  7. - name: bar
  8. - name: foo
  9. disabled:
  10. - name: foo

在源码目录 pkg/scheduler/framework/plugins/examples 中有几个示范插件,我们可以参照其实现方式。

示例

其实要实现一个调度框架的插件,并不难,我们只要实现对应的扩展点,然后将插件注册到调度器中即可,下面是默认调度器在初始化的时候注册的插件:

  1. func NewRegistry() Registry {
  2. return Registry{
  3. // FactoryMap:
  4. // New plugins are registered here.
  5. // example:
  6. // {
  7. // stateful_plugin.Name: stateful.NewStatefulMultipointExample,
  8. // fooplugin.Name: fooplugin.New,
  9. // }
  10. }
  11. }

但是可以看到默认并没有注册一些插件,所以要想让调度器能够识别我们的插件代码,就需要自己来实现一个调度器了,当然这个调度器我们完全没必要完全自己实现,直接调用默认的调度器,然后在上面的 NewRegistry() 函数中将我们的插件注册进去即可。在 kube-scheduler 的源码文件 kubernetes/cmd/kube-scheduler/app/server.go 中有一个 NewSchedulerCommand 入口函数,其中的参数是一个类型为 Option 的列表,而这个 Option 恰好就是一个插件配置的定义:

  1. // Option configures a framework.Registry.
  2. type Option func(framework.Registry) error
  3. // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
  4. func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
  5. ......
  6. }

所以我们完全就可以直接调用这个函数来作为我们的函数入口,并且传入我们自己实现的插件作为参数即可,而且该文件下面还有一个名为 WithPlugin 的函数可以来创建一个 Option 实例:

  1. // WithPlugin creates an Option based on plugin name and factory.
  2. func WithPlugin(name string, factory framework.PluginFactory) Option {
  3. return func(registry framework.Registry) error {
  4. return registry.Register(name, factory)
  5. }
  6. }

所以最终我们的入口函数如下所示:

  1. func main() {
  2. rand.Seed(time.Now().UTC().UnixNano())
  3. command := app.NewSchedulerCommand(
  4. app.WithPlugin(sample.Name, sample.New),
  5. )
  6. logs.InitLogs()
  7. defer logs.FlushLogs()
  8. if err := command.Execute(); err != nil {
  9. _, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
  10. os.Exit(1)
  11. }
  12. }

其中 app.WithPlugin(sample.Name, sample.New) 就是我们接下来要实现的插件,从 WithPlugin 函数的参数也可以看出我们这里的 sample.New 必须是一个 framework.PluginFactory 类型的值,而 PluginFactory 的定义就是一个函数:

type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)

所以 sample.New 实际上就是上面的这个函数,在这个函数中我们可以获取到插件中的一些数据然后进行逻辑处理即可,插件实现如下所示,我们这里只是简单获取下数据打印日志,如果你有实际需求的可以根据获取的数据就行处理即可,我们这里只是实现了 PreFilterFilterPreBind 三个扩展点,其他的可以用同样的方式来扩展即可:

  1. // 插件名称
  2. const Name = "sample-plugin"
  3. type Args struct {
  4. FavoriteColor string `json:"favorite_color,omitempty"`
  5. FavoriteNumber int `json:"favorite_number,omitempty"`
  6. ThanksTo string `json:"thanks_to,omitempty"`
  7. }
  8. type Sample struct {
  9. args *Args
  10. handle framework.FrameworkHandle
  11. }
  12. func (s *Sample) Name() string {
  13. return Name
  14. }
  15. func (s *Sample) PreFilter(pc *framework.PluginContext, pod *v1.Pod) *framework.Status {
  16. klog.V(3).Infof("prefilter pod: %v", pod.Name)
  17. return framework.NewStatus(framework.Success, "")
  18. }
  19. func (s *Sample) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
  20. klog.V(3).Infof("filter pod: %v, node: %v", pod.Name, nodeName)
  21. return framework.NewStatus(framework.Success, "")
  22. }
  23. func (s *Sample) PreBind(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
  24. if nodeInfo, ok := s.handle.NodeInfoSnapshot().NodeInfoMap[nodeName]; !ok {
  25. return framework.NewStatus(framework.Error, fmt.Sprintf("prebind get node info error: %+v", nodeName))
  26. } else {
  27. klog.V(3).Infof("prebind node info: %+v", nodeInfo.Node())
  28. return framework.NewStatus(framework.Success, "")
  29. }
  30. }
  31. //type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)
  32. func New(configuration *runtime.Unknown, f framework.FrameworkHandle) (framework.Plugin, error) {
  33. args := &Args{}
  34. if err := framework.DecodeInto(configuration, args); err != nil {
  35. return nil, err
  36. }
  37. klog.V(3).Infof("get plugin config args: %+v", args)
  38. return &Sample{
  39. args: args,
  40. handle: f,
  41. }, nil
  42. }

完整代码可以前往仓库 GitHub - cnych/sample-scheduler-framework: This repo is a sample for Kubernetes scheduler framework. 获取。

实现完成后,编译打包成镜像即可,然后我们就可以当成普通的应用用一个 Deployment 控制器来部署即可,由于我们需要去获取集群中的一些资源对象,所以当然需要申请 RBAC 权限,然后同样通过 --config 参数来配置我们的调度器,同样还是使用一个 KubeSchedulerConfiguration 资源对象配置,可以通过 plugins 来启用或者禁用我们实现的插件,也可以通过 pluginConfig 来传递一些参数值给插件:

  1. kind: ClusterRole
  2. apiVersion: rbac.authorization.k8s.io/v1
  3. metadata:
  4. name: sample-scheduler-clusterrole
  5. rules:
  6. - apiGroups:
  7. - ""
  8. resources:
  9. - endpoints
  10. - events
  11. verbs:
  12. - create
  13. - get
  14. - update
  15. - apiGroups:
  16. - ""
  17. resources:
  18. - nodes
  19. verbs:
  20. - get
  21. - list
  22. - watch
  23. - apiGroups:
  24. - ""
  25. resources:
  26. - pods
  27. verbs:
  28. - delete
  29. - get
  30. - list
  31. - watch
  32. - update
  33. - apiGroups:
  34. - ""
  35. resources:
  36. - bindings
  37. - pods/binding
  38. verbs:
  39. - create
  40. - apiGroups:
  41. - ""
  42. resources:
  43. - pods/status
  44. verbs:
  45. - patch
  46. - update
  47. - apiGroups:
  48. - ""
  49. resources:
  50. - replicationcontrollers
  51. - services
  52. verbs:
  53. - get
  54. - list
  55. - watch
  56. - apiGroups:
  57. - apps
  58. - extensions
  59. resources:
  60. - replicasets
  61. verbs:
  62. - get
  63. - list
  64. - watch
  65. - apiGroups:
  66. - apps
  67. resources:
  68. - statefulsets
  69. verbs:
  70. - get
  71. - list
  72. - watch
  73. - apiGroups:
  74. - policy
  75. resources:
  76. - poddisruptionbudgets
  77. verbs:
  78. - get
  79. - list
  80. - watch
  81. - apiGroups:
  82. - ""
  83. resources:
  84. - persistentvolumeclaims
  85. - persistentvolumes
  86. verbs:
  87. - get
  88. - list
  89. - watch
  90. - apiGroups:
  91. - ""
  92. resources:
  93. - configmaps
  94. verbs:
  95. - get
  96. - list
  97. - watch
  98. - apiGroups:
  99. - "storage.k8s.io"
  100. resources:
  101. - storageclasses
  102. - csinodes
  103. verbs:
  104. - get
  105. - list
  106. - watch
  107. - apiGroups:
  108. - "coordination.k8s.io"
  109. resources:
  110. - leases
  111. verbs:
  112. - create
  113. - get
  114. - list
  115. - update
  116. - apiGroups:
  117. - "events.k8s.io"
  118. resources:
  119. - events
  120. verbs:
  121. - create
  122. - patch
  123. - update
  124. ---
  125. apiVersion: v1
  126. kind: ServiceAccount
  127. metadata:
  128. name: sample-scheduler-sa
  129. namespace: kube-system
  130. ---
  131. kind: ClusterRoleBinding
  132. apiVersion: rbac.authorization.k8s.io/v1
  133. metadata:
  134. name: sample-scheduler-clusterrolebinding
  135. namespace: kube-system
  136. roleRef:
  137. apiGroup: rbac.authorization.k8s.io
  138. kind: ClusterRole
  139. name: sample-scheduler-clusterrole
  140. subjects:
  141. - kind: ServiceAccount
  142. name: sample-scheduler-sa
  143. namespace: kube-system
  144. ---
  145. apiVersion: v1
  146. kind: ConfigMap
  147. metadata:
  148. name: scheduler-config
  149. namespace: kube-system
  150. data:
  151. scheduler-config.yaml: |
  152. apiVersion: kubescheduler.config.k8s.io/v1alpha1
  153. kind: KubeSchedulerConfiguration
  154. schedulerName: sample-scheduler
  155. leaderElection:
  156. leaderElect: true
  157. lockObjectName: sample-scheduler
  158. lockObjectNamespace: kube-system
  159. plugins:
  160. preFilter:
  161. enabled:
  162. - name: "sample-plugin"
  163. filter:
  164. enabled:
  165. - name: "sample-plugin"
  166. preBind:
  167. enabled:
  168. - name: "sample-plugin"
  169. pluginConfig:
  170. - name: "sample-plugin"
  171. args:
  172. favorite_color: "#326CE5"
  173. favorite_number: 7
  174. thanks_to: "thockin"
  175. ---
  176. apiVersion: apps/v1
  177. kind: Deployment
  178. metadata:
  179. name: sample-scheduler
  180. namespace: kube-system
  181. labels:
  182. component: sample-scheduler
  183. spec:
  184. replicas: 1
  185. selector:
  186. matchLabels:
  187. component: sample-scheduler
  188. template:
  189. metadata:
  190. labels:
  191. component: sample-scheduler
  192. spec:
  193. serviceAccount: sample-scheduler-sa
  194. priorityClassName: system-cluster-critical
  195. volumes:
  196. - name: scheduler-config
  197. configMap:
  198. name: scheduler-config
  199. containers:
  200. - name: scheduler-ctrl
  201. image: cnych/sample-scheduler:v0.1.6
  202. imagePullPolicy: IfNotPresent
  203. args:
  204. - sample-scheduler-framework
  205. - --config=/etc/kubernetes/scheduler-config.yaml
  206. - --v=3
  207. resources:
  208. requests:
  209. cpu: "50m"
  210. volumeMounts:
  211. - name: scheduler-config
  212. mountPath: /etc/kubernetes

直接部署上面的资源对象即可,这样我们就部署了一个名为 sample-scheduler 的调度器了,接下来我们可以部署一个应用来使用这个调度器进行调度:

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: test-scheduler
  5. spec:
  6. replicas: 1
  7. selector:
  8. matchLabels:
  9. app: test-scheduler
  10. template:
  11. metadata:
  12. labels:
  13. app: test-scheduler
  14. spec:
  15. schedulerName: sample-scheduler
  16. containers:
  17. - image: nginx
  18. imagePullPolicy: IfNotPresent
  19. name: nginx
  20. ports:
  21. - containerPort: 80

这里需要注意的是我们现在手动指定了一个 schedulerName 的字段,将其设置成上面我们自定义的调度器名称 sample-scheduler

我们直接创建这个资源对象,创建完成后查看我们自定义调度器的日志信息:

  1. $ kubectl get pods -n kube-system -l component=sample-scheduler
  2. NAME READY STATUS RESTARTS AGE
  3. sample-scheduler-7c469787f-rwhhd 1/1 Running 0 13m
  4. $ kubectl logs -f sample-scheduler-7c469787f-rwhhd -n kube-system
  5. I0104 08:24:22.087881 1 scheduler.go:530] Attempting to schedule pod: default/test-scheduler-6d779d9465-rq2bb
  6. I0104 08:24:22.087992 1 plugins.go:23] prefilter pod: test-scheduler-6d779d9465-rq2bb
  7. I0104 08:24:22.088657 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node1
  8. I0104 08:24:22.088797 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node2
  9. I0104 08:24:22.088871 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node3
  10. I0104 08:24:22.088946 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-node4
  11. I0104 08:24:22.088992 1 plugins.go:28] filter pod: test-scheduler-6d779d9465-rq2bb, node: ydzs-master
  12. I0104 08:24:22.090653 1 plugins.go:36] prebind node info: &Node{ObjectMeta:{ydzs-node3 /api/v1/nodes/ydzs-node3 1ff6e228-4d98-4737-b6d3-30a5d55ccdc2 15466372 0 2019-11-10 09:05:09 +0000 UTC <nil> <nil> ......}
  13. I0104 08:24:22.091761 1 factory.go:610] Attempting to bind test-scheduler-6d779d9465-rq2bb to ydzs-node3
  14. I0104 08:24:22.104994 1 scheduler.go:667] pod default/test-scheduler-6d779d9465-rq2bb is bound successfully on node "ydzs-node3", 5 nodes evaluated, 4 nodes were found feasible. Bound node resource: "Capacity: CPU<4>|Memory<8008820Ki>|Pods<110>|StorageEphemeral<17921Mi>; Allocatable: CPU<4>|Memory<7906420Ki>|Pods<110>|StorageEphemeral<16912377419>.".

可以看到当我们创建完 Pod 后,在我们自定义的调度器中就出现了对应的日志,并且在我们定义的扩展点上面都出现了对应的日志,证明我们的示例成功了,也可以通过查看 Pod 的 schedulerName 来验证:

  1. $ kubectl get pods
  2. NAME READY STATUS RESTARTS AGE
  3. test-scheduler-6d779d9465-rq2bb 1/1 Running 0 22m
  4. $ kubectl get pod test-scheduler-6d779d9465-rq2bb -o yaml
  5. ......
  6. restartPolicy: Always
  7. schedulerName: sample-scheduler
  8. securityContext: {}
  9. serviceAccount: default
  10. ......

在最新的 Kubernetes v1.17 版本中,Scheduler Framework 内置的预选和优选函数已经全部插件化,所以要扩展调度器我们应该掌握并理解调度框架这种方式。

5. 疑问和思考

5.1 在k8s环境中,由于调度对于机器的资源只考虑request类型,实际上一台服务器的cpu使用率可能已经很高,但是依然被分配了pod上去,能否实现一种调度器,将cpu使用率作为一种考虑?该如何实现?

是的。

在实际情况中,pod申请到的资源并不一定会被pod使用,而k8s层面多数并没有进行资源绑核,因此就会导致cpu被分配出去实际上并没有被使用的情况。对于k8s而言,紧紧依赖request资源调度pod到运行的pod,这种类型的策略有点单薄,但是当前的调度策略中并没有该类型的调度策略方式。 因此需要自定义研发调度策略。

自定义研发调度策略最难的还是指标采集链路,因为k8s容器调度是核心链路环节,而k8s调度器在申请调度时如果依赖外部的指标采集,从而获取cpu、内存的使用率情况,这会引入一个依赖,并且通常该依赖的链条都比较长。同时由于是监控系统,通常公司级别对于监控系统的优先级定位并不高,因此对应的服务性难以保证,因此在实现策略时可以这样。

  1. 定时采集外部指标,并进行缓存机器指标

  2. 在调度策略中,对于机器的优选和过滤环节,加入cpu、内存使用率的指标考虑,参与调度逻辑

  3. 充分考虑降级策略,如果出现指标延迟、脏数据、坏数据等情况,能够及时的降级到默认调度策略,避免核心链路环节异常或者调度缓慢。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/732648
推荐阅读
  

闽ICP备14008679号