赞
踩
如果文章对你有用,可以点赞、好评、转发
文章是在公司写的,代码都被加密了,也不能上传github,所以就没有上传源代码到github了,不过按文章来,基本没问题,如果有问题,可以评论区粘贴问题
建议全屏阅读,因为有些行比较长
一:简单的,无参数,无自定义配置文件。在本文中,将会带你开发一个最简单的,能运行的调度器。
X:app.WithPlugin注册插件(name,pluginFactoryFunction)
X:声明插件对象和创建插件工厂函数pluginFactoryFunction,可以叫任意名字,用来创建我们的插件对象
X:实现各个插件接口。我们自己的插件对象会实现多个插件接口(我们这里实现prefilter和score插件接口)
X:配置KubeSchedulerConfiguration.yaml文件,这是调度器的配置文件,一定要有,因为我们必须在配置文件的profiles字段中声明我们的插件,他才会启用我们的插件,否则他会忽略
X:配置程序启动参数以便本地调试
X:创建一个pod来测试一下
二:第一版的基础上,在配置文件中带插件参数版
由k8s自动加载
X:修改NewDynamicScheduler函数,我们在配置文件中传递了一个插件参数给它
X:创建我们自定义的插件参数对象DynamicArgs,这个对象需要注册到scheduler自带的scheme中,否则k8s会无法从文件加载和转换我们自定义的对象
X:创建doc
X:创建types,必须,用来声明结构体
X:创建defaults,可选
X:创建register,必须,就是手动把pair(gvk,struct)注册到k8s内建的scheme中,以便k8s可以自动加载识别创建我们的对象(如DynamicArgs)
X:code-gen创建defaulter,可选,用来在初始化一个对象后对对象进行处理
X:code-gen创建deppcopy函数,必须,因为DynamicArgs必须实现runtime.Object接口
X:code-gen创建converter函数,必须,用于在不同的版本之间转换资源,可以用convertion-gen工具自动生成(如果有特殊需要可以删除对应的自动实现的函数,并在同一个包下手动实现一份新的函数,推荐在另一个文件中实现,因为convertion-gen每次生成时都会清空旧文件
X:把所有版本的DynamicArgs的pair/defaulter/convert注册到k8s自带的scheme中
X:强制加载我们的文件,以便强制执行文件对应的init函数,强制注册到scheme中(否则有可能我们没有用到那些文件,从而导致没有加载,从而导致没有注册,从而导致启动失败)
三:在第二版的基础上,带自定义配置文件版本
我们手动加载。流程和第二版一样,只是pair注册的scheme不同
X:创建我们自定义的对象(比如插件参数),这些对象无需k8s感知,所以可以注册到全新的scheme中,由我们自己手动加载
1:创建doc,用来写codegen注释
2:创建types,用来声明结构体
3:创建convert函数,必须实现,可以用convertion-gen工具自动生成(如果有特殊需要可以删除对应的自动实现的函数,
并在同一个包下手动实现一份新的函数,推荐在另一个文件中实现,因为convertion-gen每次生成时都会清空旧文件
4:创建defaulter,可以不实现,用来在初始话一个对象后对对象进行除了
5:创建register,就是手动把pair(gvk,struct)注册到一个全新的scheme中,然后我们手动加载(DynamicPolicy对象就是我们手动从policy.yaml文件中加载)
或者注册到k8s内建的scheme中,以便k8s可以自动加载识别创建我们的对象(如DynamicArgs)
我们下面就实现一个我们自己的第一版DynamicScheduler,我们把代码放到dynamic包内,文末我会贴出go.mod文件,然后go mod tidy就行了,就不用自己去一个一个的go get了
1:我们注册了一个新插件,该插件名字由dynamic.Name指定,该插件对象可以通过dynamic.NewDynamicScheduler函数构造
//main.go //整个main文件,就是实现一个通过app.WithPlugin注册新插件到register的功能,其余内容和scheduler.go(k8s调度器入口文件)里面的main函数一样 import "k8s.io/kubernetes/cmd/kube-scheduler/app" //通过goland,sync一下dependency就行,其余的包自己引入即可 func main() { apis.LoadAllScheme() rand.Seed(time.Now().UTC().UnixNano()) cmd := app.NewSchedulerCommand( app.WithPlugin(dynamic.Name, dynamic.NewDynamicScheduler), //注意,这里只是注册到register,只是告诉k8s如果有一个叫dynamic.Name的插件,并不会启用 //我们还必须在kubeSchedulerconfiguration.yaml文件的profiles字段中声明启用他才会启用我们的插件 ) logs.InitLogs() defer logs.FlushLogs() if err := cmd.Execute(); err != nil { _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
2:我们实现插件工厂函数dynamic.NewDynamicScheduler,这个函数返回一个实现了framework.Plugin接口的插件对象
3:实现插件接口,我们这里实现一个prefilter接口和score接口,其中prefilter中做一些初始化操作,score负责打分
package dynamic import ( "context" "fmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/kubernetes/pkg/scheduler/framework" "math/rand" "sync" "time" ) var _ framework.ScorePlugin = &DynamicScheduler{} //确保我们的插件实现了ScorePlugin接口,如果没有实现,那么此处会编译报错 var _ framework.PreFilterPlugin = &DynamicScheduler{} //同上 //这个结构体用来代表scheduler的内部状态,并演示通过CycleState来传递变量,而CycleState要求SchedulerState实现StateData接口(即实现clone()StateData方法) type SchedulerState struct { Lock sync.Mutex //调度插件对象是可以被并发访问的(比如对多个节点同时进行打分,且打分过程需要访问这个变量),访问和修改内部状态时需要加锁保护,看需要改成读写锁 Value int } func (ss *SchedulerState) Clone() framework.StateData { ss.Lock.Lock() // 加锁 defer ss.Lock.Unlock() newSs := &SchedulerState{Value: ss.Value} //深复制,这是为了简单用的int return newSs //直接返回对象就行,因为newSs实现了clone方法,所以newSs实现了StateData接口,所以可以用StateData来引用newSs } func (ss *SchedulerState) Update(value int) { //模拟改变状态 ss.Lock.Lock() // 加锁 defer ss.Lock.Unlock() ss.Value = value } func UpdateMetrics() int { //模拟更新metrics return rand.Intn(10) } func DoSomethingWithState(ss *SchedulerState) { /* 假设我们调度器的逻辑为:打分时要参考该某个prometheus指标 那么我们可以开两个线程,一个更新线程不断通过prometheus去拉取指标并保存到一个地方,然后我们的调度器在打分的时候再去读取这个指标 也就是说我们初始化一个state变量,然后把他的指针同时传递给更新线程和我们的调度器对象,这样就可以实现上述目的了,正因为存在竞争,所以state变量要是线程安全的 */ for { metricsValue := UpdateMetrics() //更新metrics ss.Update(metricsValue) //把最新的metrics更新到state变量内部,因为是指针,所以scheduler中也可以看到更新后的数据 time.Sleep(1 * time.Second) //模拟定时更新 } } var Name = "Dynamic" //调度器名字 type DynamicScheduler struct { //我们的插件对象,虽然叫scheduler,实际我们实现的是插件,k8s会把我们的插件安装到它创建的scheduler上,那个才叫scheduler SchedulerState *SchedulerState //通常用一个指针变量来代表DynamicScheduler的内部状态,这个state也可以是外部传进来的一个对象,这样调度器和外部就可能在对这个变量的读写访问上发生竞争 Handle framework.Handle //我们通过这个变量来在运行时获取k8s相关信息 } func (ds *DynamicScheduler) Name() string { //这个函数返回值对应插件的名字,这个返回值必须和我们用app.WithPlugin中使用的名字一样 return Name } func NewDynamicScheduler(plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) { //插件工厂函数,用来创建插件对象 fmt.Println("I am NewDynamicScheduler") state := &SchedulerState{ Value: 20240217, } go DoSomethingWithState(state) //一个线程去不断更新这个状态 return &DynamicScheduler{ Handle: h, //保存k8s传给我们的handler,我们可以用这个来获取nodes信息等信息 SchedulerState: state, //scheduler则通过这个指针去读取最新的状态 }, nil } //实现prefilter插件,本次调度周期中prefilter函数在filter之前执行,并且只会执行一次,可以用来执行一些初始化操作 func (ds *DynamicScheduler) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) *framework.Status { p.ClusterName = "IamClusterName" //p是指针,所以我们修改了p,在score中访问p时可以看到修改后的信息 // 如果state.Clone方法没有加锁,那么为了避免并发问题,这里需要加锁,但是这里schedulerstate本身实现了并发安全,所以DynamicScheduler这里不用自己准备一把锁 schedulerStateSnap := ds.SchedulerState.Clone() //对scheduler内部状态做一个快照,因为在一次调度周期中调度器内部状态可能会变, //假设我们对每个节点打分的时候需要参考这个内部状态,如果不同的节点看到不同的状态,就是说打分参考变了,打出的结果可能就没那么好 state.Write("SchedulerStateSnap", schedulerStateSnap) //把我们的快照信息保存到state变量中,key="SchedulerStateSnap",这个state变量贯穿本次调度,所以可以用来在不同的阶段之间传递信息 //至于nodeInfo这些k8s本身会做快照,所以我们这里一般只需要对自己的内部状态做快照 return nil } func (ds *DynamicScheduler) PreFilterExtensions() framework.PreFilterExtensions { return nil } //打分插件 func (ds *DynamicScheduler) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { fmt.Println(pod.ClusterName) //会打印出"IamClusterName" snapData, err := state.Read("SchedulerStateSnap") //我们在prefilter阶段保存一个值,放到state里,然后这里通过key把数据读出来 if err != nil { return 0, framework.NewStatus(framework.Error, "Error1") } snap, ok := snapData.(*SchedulerState) //强转一下 if !ok { return 0, framework.NewStatus(framework.Error, "Error2") } fmt.Println(snap.Value) //注意:我们前面保存的是指针变量,如果只有读,那么SchedulerState无需加锁, //如果既要读快照又要写快照,那么SchedulerState变量就必须是并发安全的,即内部读写操作都有锁,不过既然叫快照,那么一般就只有读需求 return 0, nil //status可以为nil,如果status为nil,那么k8s调度框架会认为是成功。因为k8s除了我们的打分插件还会有其他的打分插件,所以我们直接返回0即可,即什么都不干 } func (ds *DynamicScheduler) ScoreExtensions() framework.ScoreExtensions { return nil //我们只需要一个打分,打分以后什么都不做,所以空函数就行 }
4:配置kubeSchedulerconfiguration.yaml文件,文件名可以随便取,也可以叫xx.yaml,反正这个文件是由命令行参数 --config指定
apiVersion: kubescheduler.config.k8s.io/v1beta2 #我们使用的是kubescheduler.config.k8s.io组下面的v1beta2版本的kubeSchedulerconfiguration kind: KubeSchedulerConfiguration #这个配置文件会被解析成一个KubeSchedulerConfiguration对象 leaderElection: #如果选举所使用的锁不存在,他会自动创建 leaderElect: true #开启选举,如果设置为false,那么就不开选举,开选举是为了多副本 resourceName: dynamic-scheduler-lock #必须配置,如果没有配置,则竞争kube-system命名空间叫做kube-scheduler的leases锁,而系统已经有一个默认调度器了,所以会一直卡在选举这里 resourceLock: leases #锁的种类,一般都是leases resourceNamespace: dynamic-system #锁所在的命名空间,随便,只要该命名空间存在即可 clientConnection: #我们运行main程序后需要一个kube-config文件去连apiserver,当运行scheduler的时候我们还需要一个传kube-config文件路径来连集群 kubeconfig: ./config/my-kube-config.yml #为了本地调试,我们需要使用kube-config文件的方式去连集群,我们需要手动传递一个kube-config文件路径参数给他 profiles: #这是一个数组,表示可以有多个调度器,调度器由schedulerName来区分,pod指定调度器时就是指定这个调度器名字 - schedulerName: local-scheduler #调度器名字随便叫 plugins: preFilter: #即使Dynamic插件实现了preFilter插件,我们也通过app.WithPlugin注册了插件Dynamic,它都不会启用我们的插件 enabled: #只有我们在profiles.plugins.preFilter.enabled中声明了插件Dynamic,他才会最终启用我们的插件 - name: Dynamic score: enabled: - name: Dynamic weight: 3 pluginConfig: - name: Dynamic args: //k8s在调用NewDynamicScheduler函数创建插件的时候会把此处args下面的参数包装成一个叫做DynamicArgs的对象,此处我们只是演示一下参数怎么配置,实际未使用 policyConfigPath: ./config/my-policy.yaml #我们的DynamicScheduler自己定义的配置文件路径,比如说拉取哪些指标,每个指标的权重是多少,采取什么打分策略等 promeAddr: "http://a.b.c.d:9090" #prometheus的地址 metricsUpdatePeriodBySeconds: 5m #更新间隔 clusterName: chinaCluster #集群名字,prometheus拉取指标时,如果有多个集群,那么就需要根据集群名字来过滤指标,因为scheduler只会在一个集群运行
5:配置命令行启动参数,然后启动
命令行会输出一行信息:I am NewDynamicScheduler
我用的goland,编辑启动参数,需要三个命令行参数 ,
--leader-elect表示是否开启选举,会覆盖配置文件里的leader-elect选项(未验证),
--kubeconfig表示kubeconfig文件位置,用来在程序启动时去连apiserver,注意在--config指定的文件中还必须配置clientConnection选项
--config表示调度器的配置文件即kubeSchedulerConfiguration.yaml文件
最终的启动参数:
--leader-elect=true
--config=./config/my-scheduler-config.yaml
--kubeconfig=./config/my-kube-config.yml
6:创建一个pod来测试一下
我们部署以后记得看goland的输出,如果输出如下,说明我们的程序正常运行。因为他每个节点都会打一次分,所以有几个节点就会输出几次
IamClusterName
2
kind: Deployment apiVersion: apps/v1 metadata: name: nginx spec: replicas: 1 selector: matchLabels: app: nginx template: metadata: labels: app: nginx spec: containers: - name: container image: 这里自己去docker找一个镜像地址填到这里 ports: - containerPort: 8080 protocol: TCP schedulerName: local-scheduler #调度器的名字,需要合我们在上面的KubeSchedulerConfiguration.yaml文件中
附录:go.mod文件
因为k8s好多包都是使用的外部的库,需要一个一个替换,所以这里直接贴出我用的go mod
module awesomeProject5 go 1.17 replace ( k8s.io/api => k8s.io/api v0.23.3 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.23.3 k8s.io/apimachinery => k8s.io/apimachinery v0.23.3 k8s.io/apiserver => k8s.io/apiserver v0.23.3 k8s.io/cli-runtime => k8s.io/cli-runtime v0.23.3 k8s.io/client-go => k8s.io/client-go v0.23.3 k8s.io/cloud-provider => k8s.io/cloud-provider v0.23.3 k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.23.3 k8s.io/code-generator => k8s.io/code-generator v0.23.3 k8s.io/component-base => k8s.io/component-base v0.23.3 k8s.io/component-helpers => k8s.io/component-helpers v0.23.3 k8s.io/controller-manager => k8s.io/controller-manager v0.23.3 k8s.io/cri-api => k8s.io/cri-api v0.23.3 k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.23.3 k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.23.3 k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.23.3 k8s.io/kube-proxy => k8s.io/kube-proxy v0.23.3 k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.23.3 k8s.io/kubectl => k8s.io/kubectl v0.23.3 k8s.io/kubelet => k8s.io/kubelet v0.23.3 k8s.io/kubernetes => k8s.io/kubernetes v1.23.3 k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.23.3 k8s.io/metrics => k8s.io/metrics v0.23.3 k8s.io/mount-utils => k8s.io/mount-utils v0.23.3 k8s.io/pod-security-admission => k8s.io/pod-security-admission v0.23.3 k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.23.3 ) require ( k8s.io/api v0.23.3 k8s.io/apimachinery v0.23.3 k8s.io/component-base v0.23.3 k8s.io/kubernetes v0.0.0-00010101000000-000000000000 ) require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/NYTimes/gziphandler v1.1.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.2.0 // indirect github.com/blang/semver v3.5.1+incompatible // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cyphar/filepath-securejoin v0.2.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.7.1+incompatible // indirect github.com/emicklei/go-restful v2.9.5+incompatible // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/felixge/httpsnoop v1.0.1 // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-logr/logr v1.2.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.19.5 // indirect github.com/go-openapi/swag v0.19.14 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-cmp v0.5.5 // indirect github.com/google/gofuzz v1.1.0 // indirect github.com/google/uuid v1.1.2 // indirect github.com/googleapis/gnostic v0.5.5 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/imdario/mergo v0.3.5 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.6 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect github.com/moby/term v0.0.0-20210610120745-9d4ed1856297 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/gomega v1.15.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/runc v1.0.2 // indirect github.com/opencontainers/selinux v1.8.2 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.11.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.28.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect github.com/spf13/cobra v1.2.1 // indirect github.com/spf13/pflag v1.0.5 // indirect go.etcd.io/etcd/api/v3 v3.5.0 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect go.etcd.io/etcd/client/v3 v3.5.0 // indirect go.opentelemetry.io/contrib v0.20.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0 // indirect go.opentelemetry.io/otel v0.20.0 // indirect go.opentelemetry.io/otel/exporters/otlp v0.20.0 // indirect go.opentelemetry.io/otel/metric v0.20.0 // indirect go.opentelemetry.io/otel/sdk v0.20.0 // indirect go.opentelemetry.io/otel/sdk/export/metric v0.20.0 // indirect go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.0 // indirect golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e // indirect golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2 // indirect google.golang.org/grpc v1.40.0 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect k8s.io/apiserver v0.23.3 // indirect k8s.io/client-go v0.23.3 // indirect k8s.io/cloud-provider v0.23.3 // indirect k8s.io/component-helpers v0.23.3 // indirect k8s.io/csi-translation-lib v0.23.3 // indirect k8s.io/klog/v2 v2.30.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect k8s.io/kube-scheduler v0.0.0 // indirect k8s.io/mount-utils v0.23.3 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.27 // indirect sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect sigs.k8s.io/yaml v1.2.0 // indirect )
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。