当前位置:   article > 正文

Kubernetes 开发【5】—— scheduling framework 构建自定义调度插件

scheduling framework

目录

环境信息

背景

前言

主体框架

插件获取 clientSet/informer

设置插件调度参数

总结


环境信息

kubernetes:1.22

centos:7.9

apiVersion: kubescheduler.config.k8s.io/v1beta2(即将在1.25中被废弃)

背景

对于创建一个pod,我们知道有以下流程

在调度/绑定的不同阶段,我们都可以注入我们自定义的插件,来对调度流程进行一定的干预。

调度框架Scheduling Framework正是实现这一功能的工具手段。

由于调度器涉及到调度算法,本文在这方面不进行深究,仅记录Scheduling Framework的搭建和简单使用

前言

对于不同插件的基本描述,摘自官方文档

PreFilter

这些插件用于预处理 Pod 的相关信息,或者检查集群或 Pod 必须满足的某些条件。 如果 PreFilter 插件返回错误,则调度周期将终止。

Filter

这些插件用于过滤出不能运行该 Pod 的节点。对于每个节点, 调度器将按照其配置顺序调用这些过滤插件。如果任何过滤插件将节点标记为不可行, 则不会为该节点调用剩下的过滤插件。节点可以被同时进行评估。

PostFilter

这些插件在 Filter 阶段后调用,但仅在该 Pod 没有可行的节点时调用。 插件按其配置的顺序调用。如果任何 PostFilter 插件标记节点为“Schedulable”, 则其余的插件不会调用。典型的 PostFilter 实现是抢占,试图通过抢占其他 Pod 的资源使该 Pod 可以调度。

PreScore

这些插件用于执行 “前置评分(pre-scoring)” 工作,即生成一个可共享状态供 Score 插件使用。 如果 PreScore 插件返回错误,则调度周期将终止。

Score

这些插件用于对通过过滤阶段的节点进行排序。调度器将为每个节点调用每个评分插件。 将有一个定义明确的整数范围,代表最小和最大分数。 在标准化评分阶段之后,调度器将根据配置的插件权重 合并所有插件的节点分数。

主体框架

首先参考官方的示例代码

GitHub - kubernetes-sigs/scheduler-plugins: Repository for out-of-tree scheduler plugins based on scheduler framework.

我们构建出main.go

  1. package main
  2. import (
  3. "fmt"
  4. "k8s.io/component-base/logs"
  5. "k8s.io/kubernetes/cmd/kube-scheduler/app"
  6. "myscheduler/lib"
  7. "os"
  8. )
  9. func main() {
  10. //来自/blob/master/cmd/scheduler/main.go
  11. command := app.NewSchedulerCommand(
  12. //可变参数——需要注入的插件列表
  13. app.WithPlugin(lib.TestSchedulingName, lib.NewTestScheduling),
  14. )
  15. logs.InitLogs()
  16. defer logs.FlushLogs()
  17. if err := command.Execute(); err != nil {
  18. _, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
  19. os.Exit(1)
  20. }
  21. }

其中,需要我们定义好插件的名称和相关实现,因此引出如下的代码框架

  1. //出自/pkg/capacityscheduling 只留了主体框架,简化了大部分
  2. const TestSchedulingName = "test-scheduling" //记住这个调度器名称
  3. type TestScheduling struct {}
  4. func (*TestScheduling) Name() string { //实现framework.Plugin的接口方法
  5. return TestSchedulingName
  6. }
  7. func NewTestScheduling(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
  8. return &TestScheduling{}, nil
  9. }

插件方法

不同阶段的插件在framework包里其实就是不同的接口,我们要注入对应阶段的插件,那么必须实现相应的接口方法,这里参照官方的方法,通过goland快速生成preFilter的接口方法。

var _ framework.PreFilterPlugin = &TestScheduling{}

生成了两个接口方法

  1. //业务方法
  2. func PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status
  3. //这个方法是在生成pod或删除pod时产生一些需要评估的内容,返回值同样是个接口,返回自身并快速生成接口方法即可
  4. func PreFilterExtensions() framework.PreFilterExtensions

在其中实现我们所要进行的干预即可。

注册调度器

代码编译,打包镜像的步骤省略~

因为调度器pod需要访问apiserver,所以需要指定serviceaccount并且绑定权限

  1. kind: ClusterRole
  2. apiVersion: rbac.authorization.k8s.io/v1
  3. metadata:
  4. name: test-scheduling-clusterrole
  5. rules:
  6. - apiGroups:
  7. - ""
  8. resources:
  9. - endpoints
  10. - events
  11. verbs:
  12. - create
  13. - get
  14. - update
  15. - apiGroups:
  16. - ""
  17. resources:
  18. - nodes
  19. verbs:
  20. - get
  21. - list
  22. - watch
  23. - apiGroups:
  24. - ""
  25. resources:
  26. - pods
  27. verbs:
  28. - delete
  29. - get
  30. - list
  31. - watch
  32. - update
  33. - apiGroups:
  34. - ""
  35. resources:
  36. - bindings
  37. - pods/binding
  38. verbs:
  39. - create
  40. - apiGroups:
  41. - ""
  42. resources:
  43. - pods/status
  44. verbs:
  45. - patch
  46. - update
  47. - apiGroups:
  48. - ""
  49. resources:
  50. - replicationcontrollers
  51. - services
  52. verbs:
  53. - get
  54. - list
  55. - watch
  56. - apiGroups:
  57. - apps
  58. - extensions
  59. resources:
  60. - replicasets
  61. verbs:
  62. - get
  63. - list
  64. - watch
  65. - apiGroups:
  66. - apps
  67. resources:
  68. - statefulsets
  69. verbs:
  70. - get
  71. - list
  72. - watch
  73. - apiGroups:
  74. - policy
  75. resources:
  76. - poddisruptionbudgets
  77. verbs:
  78. - get
  79. - list
  80. - watch
  81. - apiGroups:
  82. - ""
  83. resources:
  84. - persistentvolumeclaims
  85. - persistentvolumes
  86. verbs:
  87. - get
  88. - list
  89. - watch
  90. - apiGroups:
  91. - ""
  92. resources:
  93. - namespaces
  94. - configmaps
  95. verbs:
  96. - get
  97. - list
  98. - watch
  99. - apiGroups:
  100. - "storage.k8s.io"
  101. resources: ['*']
  102. verbs:
  103. - get
  104. - list
  105. - watch
  106. - apiGroups:
  107. - "coordination.k8s.io"
  108. resources:
  109. - leases
  110. verbs:
  111. - create
  112. - get
  113. - list
  114. - update
  115. - apiGroups:
  116. - "events.k8s.io"
  117. resources:
  118. - events
  119. verbs:
  120. - create
  121. - patch
  122. - update
  123. ---
  124. apiVersion: v1
  125. kind: ServiceAccount
  126. metadata:
  127. name: test-scheduling-sa
  128. namespace: kube-system
  129. ---
  130. kind: ClusterRoleBinding
  131. apiVersion: rbac.authorization.k8s.io/v1
  132. metadata:
  133. name: test-scheduling-clusterrolebinding
  134. namespace: kube-system
  135. roleRef:
  136. apiGroup: rbac.authorization.k8s.io
  137. kind: ClusterRole
  138. name: test-scheduling-clusterrole
  139. subjects:
  140. - kind: ServiceAccount
  141. name: test-scheduling-sa
  142. namespace: kube-system

调度器启动时,需要引用相应的配置文件来进行插件类型的注册,参数的设置等,我们通过configMap来将配置挂载进容器

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: test-scheduling-config
  5. namespace: kube-system
  6. data:
  7. config.yaml: |
  8. apiVersion: kubescheduler.config.k8s.io/v1beta2
  9. kind: KubeSchedulerConfiguration
  10. leaderElection:
  11. leaderElect: false
  12. profiles:
  13. - schedulerName: test-scheduling
  14. plugins:
  15. preFilter:
  16. enabled:
  17. - name: "test-scheduling"

接下来是对调度器的定义,以固定节点并将可执行文件挂载的方式运行(仅用于测试)

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: test-scheduling
  5. namespace: kube-system
  6. spec:
  7. replicas: 1
  8. selector:
  9. matchLabels:
  10. app: test-scheduling
  11. template:
  12. metadata:
  13. labels:
  14. app: test-scheduling
  15. spec:
  16. nodeName: master-01
  17. serviceAccount: test-scheduling-sa
  18. containers:
  19. - name: tests-cheduling
  20. image: alpine:3.12
  21. imagePullPolicy: IfNotPresent
  22. command: ["/app/test-scheduling"]
  23. args:
  24. - --config=/etc/kubernetes/config.yaml
  25. - --v=3
  26. volumeMounts:
  27. - name: config
  28. mountPath: /etc/kubernetes
  29. - name: app
  30. mountPath: /app
  31. volumes:
  32. - name: config
  33. configMap:
  34. name: test-scheduling-config
  35. - name: app
  36. hostPath:
  37. path: /root/schedular

 检查调度器状态,设置为Running后,即可以在创建负载时在Pod的配置中的schedulerName项指定调度器。

  1. apiVersion: apps/v1
  2. kind: Deployment
  3. metadata:
  4. name: testngx
  5. namespace: default
  6. spec:
  7. replicas: 1
  8. selector:
  9. matchLabels:
  10. app: testngx
  11. template:
  12. metadata:
  13. labels:
  14. app: testngx
  15. spec:
  16. schedulerName: test-scheduling
  17. containers:
  18. - image: nginx:1.18-alpine
  19. imagePullPolicy: IfNotPresent
  20. name: testngx
  21. ports:
  22. - containerPort: 80

观察调度器的日志,可以发现,成功进行了干预

kubectl logs test-scheduling-54fd7c585f-gmbb6 -n kube-system -f

I1117 08:51:46.567953       1 eventhandlers.go:123] "Add event for unscheduled pod" pod="default/testngx-7cd55446f7-4cmgv"

I1117 08:51:46.568030       1 scheduler.go:516] "Attempting to schedule pod" pod="default/testngx-7cd55446f7-4cmgv"

I1117 08:51:46.568094       1 test-scheduling.go:57] 预过滤

插件获取 clientSet/informer

从本节开始,介绍几种插件中的常规做法;

 首先是go-client的获取,示例场景:在filter插件中,过滤带有xx标签的节点。

我们观察到,插件结构体的构造函数中,有这么一个入参:

f framework.Handle

这个Handle类型,可以帮助我们获取clientSet或者是informer;这里以获取informer为例。

那么,就有新的成员变量以及构造函数

  1. type TestScheduling struct {
  2. fac informers.SharedInformerFactory
  3. }
  1. func NewTestScheduling(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
  2. return &TestScheduling{
  3. fac: f.SharedInformerFactory(),
  4. }, nil //注入informer工厂
  5. }

 那么就可以在插件接口方法里去实现这个逻辑了

  1. func (s *TestScheduling) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
  2. klog.V(3).Infof("过滤节点")
  3. for k, v := range nodeInfo.Node().Labels {
  4. if k == "scheduling" && v != "true" {
  5. return framework.NewStatus(framework.Unschedulable, "设置了不可调度的标签")
  6. }
  7. }
  8. return framework.NewStatus(framework.Success)
  9. }

同样需要在调度器的配置文件中开启这个插件

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: test-scheduling-config
  5. namespace: kube-system
  6. data:
  7. config.yaml: |
  8. apiVersion: kubescheduler.config.k8s.io/v1beta2
  9. kind: KubeSchedulerConfiguration
  10. leaderElection:
  11. leaderElect: false
  12. profiles:
  13. - schedulerName: test-scheduling
  14. plugins:
  15. preFilter:
  16. enabled:
  17. - name: "test-scheduling"
  18. filter:
  19. enabled:
  20. - name: "test-scheduling"

为集群中的一个节点打上相应的标签

kubectl label node node-01 scheduling=false

然后创建负载,观察到pod为pending状态 

testngx-677b6896b-nqsk8   0/1     Pending   0          5s

查看pod事件,观察到节点因为设置了不可调度的标签而被过滤掉了

Events:

  Type     Reason            Age   From             Message

  ----     ------            ----  ----             -------

  Warning  FailedScheduling  28s   test-scheduling  0/3 nodes are available: 1 node(s) didn't match Pod's node affinity/selector, 1 node(s) had taint {node-role.kubernetes.io/master: }, that the pod didn't tolerate, 1 设置了不可调度的标签.

至此,调度器中clientset/informer的获取和调度失败的示例演示就完成了。

值得一提的是,如果负载yaml通过nodeName固定了节点,那么shcedulerName配置的调度器将不作用于Pod的调度中,即使是filter插件中的逻辑过滤了所要固定的节点时。

设置插件调度参数

本节演示通过设置配置文件的参数,来让调度器动态读取。

示例场景是如果创建负载所在的命名空间的Pod数量超过了n,则返回调度失败。因为不涉及节点筛选阶段,所以最好的实践是在prefilter插件中实现。

调度器配置文件中有如下配置:

  1. apiVersion: v1
  2. kind: ConfigMap
  3. metadata:
  4. name: test-scheduling-config
  5. namespace: kube-system
  6. data:
  7. config.yaml: |
  8. apiVersion: kubescheduler.config.k8s.io/v1beta2
  9. kind: KubeSchedulerConfiguration
  10. leaderElection:
  11. leaderElect: false
  12. profiles:
  13. - schedulerName: test-scheduling
  14. plugins:
  15. preFilter:
  16. enabled:
  17. - name: "test-scheduling"
  18. filter:
  19. enabled:
  20. - name: "test-scheduling"
  21. pluginConfig:
  22. - name: test-scheduling
  23. args:
  24. maxPods: 5

其中设定了最大 Pod数量为5。

将这个参数作为成员变量加入到调度器结构体中

  1. type TestScheduling struct {
  2. fac informers.SharedInformerFactory
  3. args *Args
  4. }
  5. type Args struct {
  6. MaxPods int `json:"maxPods,omitempty"`
  7. }

在构造函数中,有这样一个入参:

configuration runtime.Object

这其中包含了我们在配置文件中的配置,将其反解到我们的配置结构体中,并在构造函数中赋值

  1. func NewTestScheduling(configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
  2. args := &Args{}
  3. if err := frameworkruntime.DecodeInto(configuration, args); err != nil { //由配置文件注入参数,并通过configuration获取
  4. return nil, err
  5. }
  6. return &TestScheduling{
  7. fac: f.SharedInformerFactory(),
  8. args: args,
  9. }, nil //注入informer工厂
  10. }

在filter插件的接口函数中直接取即可。

  1. func (s *TestScheduling) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status {
  2. klog.V(3).Infof("预过滤")
  3. pods, err := s.fac.Core().V1().Pods().Lister().Pods(p.Namespace).List(labels.Everything())
  4. if err != nil {
  5. return framework.NewStatus(framework.Error, err.Error())
  6. }
  7. if len(pods) > s.args.MaxPods {
  8. return framework.NewStatus(framework.Unschedulable, "pod数量超过了最大限制")
  9. }
  10. return framework.NewStatus(framework.Success)
  11. }

调度失败的观察结果与上一节类似,这里省略了。

总结

 至此,我们完成硬性过滤插件中一些简单的玩法。后面我们将演示软性打分插件prescore/score插件中的一些基操,包括presocre预打分插件中抓取和存入原始数据,这份数据如果和prescore和score插件中传递,以及如何通过Normalize归一化处理来使多个插件协同打分,使最终得分落在一个标定的区间。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/732627
推荐阅读
相关标签
  

闽ICP备14008679号