赞
踩
目录
3.3 APiServer启动http server监听http请求
Kubernetes架构分为三层,即:控制平面、数据平面、计算平面。控制平面主要是指kube-apiserver、kube-controller-manager、kube-scheduler三个组件;数据平面是指etcd集群;计算平面是指kubelet、kube-proxy组件。我们通常会将控制平面和数据平面部署在同一组节点上,通常称其为Master节点,而计算平面称为Node节点。控制平面是整个集群的大脑,负责控制、调度集群资源;计算平面负责运行工作负载,是控制平面调度的对象;而数据平面则用来存储整个集群持久化数据,比如我们提交的配置文件以及集群状态信息等。我们通常通过kubectl这个命令行客户端来控制Kubernetes集群。
kubernetes集群中各个模块之间的依赖关系如下图所示:
图1 kubernetes架构图(源自:https://kubernetes.io/zh/docs/concepts/architecture/cloud-controller/)
kubectl通过命令行向apiserver发起rest请求,进行相应的资源调度管理相关操作,由apiserver收到命令之后落地到etcd数据库,然后其他各个模块与apiserver交互,实现相关资源调度工作的执行。
图2 服务调用依赖关系图(源自:https://blog.csdn.net/u014458692/article/details/108068088)
各个组件说明:
kube-apiserver 对外暴露Kubernetes API,所有对集群的操作都是通过这组API完成,包括客户端下达应用编排命令给Kubernetes集群;kubelet上报集群资源使用情况;以及各个组件之间的交互都是通过这套API完成的。
kube-controller-manager 负责整个Kubernetes的管理工作,保证集群中各种资源处于期望状态,当监控到集群中某个资源状态与期望状态不符时,controller-manager会触发调度操作。
kube-scheduler 调度器负责Kubernetes集群的具体调度工作,接收来自于controller-manager触发的调度操作请求,然后根据请求规格、调度约束、整体资源情况进行调度计算,最后将任务发送到目标节点由kubelet组件执行。
etcd 是一个高效KV存储系统。在Kubernetes环境中主要用于存储所有需要持久化的数据。
kubelet 是Node节点上的核心组件,负责与docker daemon进行交互运行docker容器;配置网络和数据卷;监控并上报节点资源使用情况以供调度器使用。
kube-proxy 主要负责Service Endpoint到POD实例的请求转发及负载均衡的规则管理。
本文剖析源码的版本:kubernetes的源码版本为1.20, golang的源码版本为:1.14
kubectl的源码入口:cmd/kubectl/kubectl.go:35
cmd包里面包含kubernetes常用的各种命令行工具,其中kubectl工具的入口方法为kubectl.go文件中的main方法,通过command := cmd.NewDefaultKubectlCommand()方法实现新建一个kubectl的命令行对象,然后调用command.Execute()方法执行命令,实现与控制面板的服务端服务对接。具体入口源码如下:
- func main() {
- rand.Seed(time.Now().UnixNano())
-
- command := cmd.NewDefaultKubectlCommand()
-
- // TODO: once we switch everything over to Cobra commands, we can go back to calling
- // cliflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
- // normalize func and add the go flag set by hand.
- pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
- pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
- // cliflag.InitFlags()
- logs.InitLogs()
- defer logs.FlushLogs()
-
- if err := command.Execute(); err != nil {
- os.Exit(1)
- }
- }
命令行工具采用的是Cobra的工具包对参数进行解析,解析成功之后,采用一整套的模板方法实现整个执行过程的调度。
首先分析一下kubectl命令行对象的创建,具体的创建该对象的函数为:pkg/kubectl/cmd/cmd.go:432 NewKubectlCommand 函数,源代码如下:
- // NewKubectlCommand creates the `kubectl` command and its nested children.
- func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
- warningHandler := rest.NewWarningWriter(err, rest.WarningWriterOptions{Deduplicate: true, Color: term.AllowsColorOutput(err)})
- warningsAsErrors := false
-
- // Parent command to which all subcommands are added.
- cmds := &cobra.Command{
- Use: "kubectl",
- Short: i18n.T("kubectl controls the Kubernetes cluster manager"),
- Long: templates.LongDesc(`
- kubectl controls the Kubernetes cluster manager.
- Find more information at:
- https://kubernetes.io/docs/reference/kubectl/overview/`),
- Run: runHelp,
- // Hook before and after Run initialize and write profiles to disk,
- // respectively.
- PersistentPreRunE: func(*cobra.Command, []string) error {
- rest.SetDefaultWarningHandler(warningHandler)
- return initProfiling()
- },
- PersistentPostRunE: func(*cobra.Command, []string) error {
- if err := flushProfiling(); err != nil {
- return err
- }
- if warningsAsErrors {
- count := warningHandler.WarningCount()
- switch count {
- case 0:
- // no warnings
- case 1:
- return fmt.Errorf("%d warning received", count)
- default:
- return fmt.Errorf("%d warnings received", count)
- }
- }
- return nil
- },
- BashCompletionFunction: bashCompletionFunc,
- }
-
- flags := cmds.PersistentFlags()
- flags.SetNormalizeFunc(cliflag.WarnWordSepNormalizeFunc) // Warn for "_" flags
-
- // Normalize all flags that are coming from other packages or pre-configurations
- // a.k.a. change all "_" to "-". e.g. glog package
- flags.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
-
- addProfilingFlags(flags)
-
- flags.BoolVar(&warningsAsErrors, "warnings-as-errors", warningsAsErrors, "Treat warnings received from the server as errors and exit with a non-zero exit code")
-
- kubeConfigFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag()
- kubeConfigFlags.AddFlags(flags)
- matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
- matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())
-
- cmds.PersistentFlags().AddGoFlagSet(flag.CommandLine)
-
- f := cmdutil.NewFactory(matchVersionKubeConfigFlags)
-
- // Sending in 'nil' for the getLanguageFn() results in using
- // the LANG environment variable.
- //
- // TODO: Consider adding a flag or file preference for setting
- // the language, instead of just loading from the LANG env. variable.
- i18n.LoadTranslations("kubectl", nil)
-
- // From this point and forward we get warnings on flags that contain "_" separators
- cmds.SetGlobalNormalizationFunc(cliflag.WarnWordSepNormalizeFunc)
-
- ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}
-
- groups := templates.CommandGroups{
- {
- Message: "Basic Commands (Beginner):",
- Commands: []*cobra.Command{
- create.NewCmdCreate(f, ioStreams),
- expose.NewCmdExposeService(f, ioStreams),
- run.NewCmdRun(f, ioStreams),
- set.NewCmdSet(f, ioStreams),
- },
- },
- {
- Message: "Basic Commands (Intermediate):",
- Commands: []*cobra.Command{
- explain.NewCmdExplain("kubectl", f, ioStreams),
- get.NewCmdGet("kubectl", f, ioStreams),
- edit.NewCmdEdit(f, ioStreams),
- delete.NewCmdDelete(f, ioStreams),
- },
- },
- {
- Message: "Deploy Commands:",
- Commands: []*cobra.Command{
- rollout.NewCmdRollout(f, ioStreams),
- scale.NewCmdScale(f, ioStreams),
- autoscale.NewCmdAutoscale(f, ioStreams),
- },
- },
- {
- Message: "Cluster Management Commands:",
- Commands: []*cobra.Command{
- certificates.NewCmdCertificate(f, ioStreams),
- clusterinfo.NewCmdClusterInfo(f, ioStreams),
- top.NewCmdTop(f, ioStreams),
- drain.NewCmdCordon(f, ioStreams),
- drain.NewCmdUncordon(f, ioStreams),
- drain.NewCmdDrain(f, ioStreams),
- taint.NewCmdTaint(f, ioStreams),
- },
- },
- {
- Message: "Troubleshooting and Debugging Commands:",
- Commands: []*cobra.Command{
- describe.NewCmdDescribe("kubectl", f, ioStreams),
- logs.NewCmdLogs(f, ioStreams),
- attach.NewCmdAttach(f, ioStreams),
- cmdexec.NewCmdExec(f, ioStreams),
- portforward.NewCmdPortForward(f, ioStreams),
- proxy.NewCmdProxy(f, ioStreams),
- cp.NewCmdCp(f, ioStreams),
- auth.NewCmdAuth(f, ioStreams),
- },
- },
- {
- Message: "Advanced Commands:",
- Commands: []*cobra.Command{
- diff.NewCmdDiff(f, ioStreams),
- apply.NewCmdApply("kubectl", f, ioStreams),
- patch.NewCmdPatch(f, ioStreams),
- replace.NewCmdReplace(f, ioStreams),
- wait.NewCmdWait(f, ioStreams),
- convert.NewCmdConvert(f, ioStreams),
- kustomize.NewCmdKustomize(ioStreams),
- },
- },
- {
- Message: "Settings Commands:",
- Commands: []*cobra.Command{
- label.NewCmdLabel(f, ioStreams),
- annotate.NewCmdAnnotate("kubectl", f, ioStreams),
- completion.NewCmdCompletion(ioStreams.Out, ""),
- },
- },
- }
- groups.Add(cmds)
-
- filters := []string{"options"}
-
- // Hide the "alpha" subcommand if there are no alpha commands in this build.
- alpha := cmdpkg.NewCmdAlpha(f, ioStreams)
- if !alpha.HasSubCommands() {
- filters = append(filters, alpha.Name())
- }
-
- templates.ActsAsRootCommand(cmds, filters, groups...)
-
- for name, completion := range bashCompletionFlags {
- if cmds.Flag(name) != nil {
- if cmds.Flag(name).Annotations == nil {
- cmds.Flag(name).Annotations = map[string][]string{}
- }
- cmds.Flag(name).Annotations[cobra.BashCompCustom] = append(
- cmds.Flag(name).Annotations[cobra.BashCompCustom],
- completion,
- )
- }
- }
-
- cmds.AddCommand(alpha)
- cmds.AddCommand(cmdconfig.NewCmdConfig(f, clientcmd.NewDefaultPathOptions(), ioStreams))
- cmds.AddCommand(plugin.NewCmdPlugin(f, ioStreams))
- cmds.AddCommand(version.NewCmdVersion(f, ioStreams))
- cmds.AddCommand(apiresources.NewCmdAPIVersions(f, ioStreams))
- cmds.AddCommand(apiresources.NewCmdAPIResources(f, ioStreams))
- cmds.AddCommand(options.NewCmdOptions(ioStreams.Out))
-
- return cmds
- }
kubectl的命令行cobra.Command是一个父子树,其中kubectl命令作为树的根节点,然后添加了子节点(groups变量),各个子节点以kubectl Command节点为父节点,如根据文件创建,编辑文件,创建命名空间,创建部署服务,创建证书等等。同样,kubectl的子节点还可以拥有自己的子节点,创建都类似。
参考一个kubectl create 命令解析的入口: vendor/k8s.io/kubectl/pkg/cmd/create/create.go:100 NewCmdCreate方法,其源代码如下:
- // NewCmdCreate returns new initialized instance of create sub command
- func NewCmdCreate(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
- o := NewCreateOptions(ioStreams)
-
- cmd := &cobra.Command{
- Use: "create -f FILENAME",
- DisableFlagsInUseLine: true,
- Short: i18n.T("Create a resource from a file or from stdin."),
- Long: createLong,
- Example: createExample,
- Run: func(cmd *cobra.Command, args []string) {
- if cmdutil.IsFilenameSliceEmpty(o.FilenameOptions.Filenames, o.FilenameOptions.Kustomize) {
- ioStreams.ErrOut.Write([]byte("Error: must specify one of -f and -k\n\n"))
- defaultRunFunc := cmdutil.DefaultSubCommandRun(ioStreams.ErrOut)
- defaultRunFunc(cmd, args)
- return
- }
- cmdutil.CheckErr(o.Complete(f, cmd))
- cmdutil.CheckErr(o.ValidateArgs(cmd, args))
- cmdutil.CheckErr(o.RunCreate(f, cmd))
- },
- }
-
- // bind flag structs
- o.RecordFlags.AddFlags(cmd)
-
- usage := "to use to create the resource"
- cmdutil.AddFilenameOptionFlags(cmd, &o.FilenameOptions, usage)
- cmdutil.AddValidateFlags(cmd)
- cmd.Flags().BoolVar(&o.EditBeforeCreate, "edit", o.EditBeforeCreate, "Edit the API resource before creating")
- cmd.Flags().Bool("windows-line-endings", runtime.GOOS == "windows",
- "Only relevant if --edit=true. Defaults to the line ending native to your platform.")
- cmdutil.AddApplyAnnotationFlags(cmd)
- cmdutil.AddDryRunFlag(cmd)
- cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on, supports '=', '==', and '!='.(e.g. -l key1=value1,key2=value2)")
- cmd.Flags().StringVar(&o.Raw, "raw", o.Raw, "Raw URI to POST to the server. Uses the transport specified by the kubeconfig file.")
- cmdutil.AddFieldManagerFlagVar(cmd, &o.fieldManager, "kubectl-create")
-
- o.PrintFlags.AddFlags(cmd)
-
- // create subcommands
- cmd.AddCommand(NewCmdCreateNamespace(f, ioStreams))
- cmd.AddCommand(NewCmdCreateQuota(f, ioStreams))
- cmd.AddCommand(NewCmdCreateSecret(f, ioStreams))
- cmd.AddCommand(NewCmdCreateConfigMap(f, ioStreams))
- cmd.AddCommand(NewCmdCreateServiceAccount(f, ioStreams))
- cmd.AddCommand(NewCmdCreateService(f, ioStreams))
- cmd.AddCommand(NewCmdCreateDeployment(f, ioStreams))
- cmd.AddCommand(NewCmdCreateClusterRole(f, ioStreams))
- cmd.AddCommand(NewCmdCreateClusterRoleBinding(f, ioStreams))
- cmd.AddCommand(NewCmdCreateRole(f, ioStreams))
- cmd.AddCommand(NewCmdCreateRoleBinding(f, ioStreams))
- cmd.AddCommand(NewCmdCreatePodDisruptionBudget(f, ioStreams))
- cmd.AddCommand(NewCmdCreatePriorityClass(f, ioStreams))
- cmd.AddCommand(NewCmdCreateJob(f, ioStreams))
- cmd.AddCommand(NewCmdCreateCronJob(f, ioStreams))
- return cmd
- }
代码中,通过cmdutil.AddFilenameOptionFlags,cmdutil.AddValidateFlags,cmd.Flags().BoolVar,cmd.Flags().Bool等方法为kubectl命令添加对应的命令行参数; 通过cmd.AddCommand() 方法为create命令创建子命令,如namespace,secret, configmap,role, job等等,以此构建一整套命令行的树结构。
在NewCmdCreate方法中,采用cobra.Command命令的Run方法中,首先对参数调用cmdutil.CheckErr(o.Complete(f, cmd))方法实现参数的补齐,其次调用cmdutil.CheckErr(o.ValidateArgs(cmd, args))方法实现对参数的合理性进行校验,最后通过cmdutil.CheckErr(o.RunCreate(f, cmd))方法实现对命令的执行。
查看RunCreate方法源码,vendor/k8s.io/kubectl/pkg/cmd/create/create.go:227 RunCreate方法,通过该方法创建,方法中通过链路形式创建request请求,通过 obj, err := resource.NewHelper(info.Client, info.Mapping).DryRun(o.DryRunStrategy == cmdutil.DryRunServer).WithFieldManager(o.fieldManager).Create(info.Namespace, true, info.Object)调用,实现Rest调用触发服务端API服务,完成相应的创建动作。
- // RunCreate performs the creation
- func (o *CreateOptions) RunCreate(f cmdutil.Factory, cmd *cobra.Command) error {
- // raw only makes sense for a single file resource multiple objects aren't likely to do what you want.
- // the validator enforces this, so
- if len(o.Raw) > 0 {
- restClient, err := f.RESTClient()
- if err != nil {
- return err
- }
- return rawhttp.RawPost(restClient, o.IOStreams, o.Raw, o.FilenameOptions.Filenames[0])
- }
-
- if o.EditBeforeCreate {
- return RunEditOnCreate(f, o.PrintFlags, o.RecordFlags, o.IOStreams, cmd, &o.FilenameOptions, o.fieldManager)
- }
- schema, err := f.Validator(cmdutil.GetFlagBool(cmd, "validate"))
- if err != nil {
- return err
- }
-
- cmdNamespace, enforceNamespace, err := f.ToRawKubeConfigLoader().Namespace()
- if err != nil {
- return err
- }
-
- r := f.NewBuilder().
- Unstructured().
- Schema(schema).
- ContinueOnError().
- NamespaceParam(cmdNamespace).DefaultNamespace().
- FilenameParam(enforceNamespace, &o.FilenameOptions).
- LabelSelectorParam(o.Selector).
- Flatten().
- Do()
- err = r.Err()
- if err != nil {
- return err
- }
-
- count := 0
- err = r.Visit(func(info *resource.Info, err error) error {
- if err != nil {
- return err
- }
- if err := util.CreateOrUpdateAnnotation(cmdutil.GetFlagBool(cmd, cmdutil.ApplyAnnotationsFlag), info.Object, scheme.DefaultJSONEncoder()); err != nil {
- return cmdutil.AddSourceToErr("creating", info.Source, err)
- }
-
- if err := o.Recorder.Record(info.Object); err != nil {
- klog.V(4).Infof("error recording current command: %v", err)
- }
-
- if o.DryRunStrategy != cmdutil.DryRunClient {
- if o.DryRunStrategy == cmdutil.DryRunServer {
- if err := o.DryRunVerifier.HasSupport(info.Mapping.GroupVersionKind); err != nil {
- return cmdutil.AddSourceToErr("creating", info.Source, err)
- }
- }
- obj, err := resource.
- NewHelper(info.Client, info.Mapping).
- DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
- WithFieldManager(o.fieldManager).
- Create(info.Namespace, true, info.Object)
- if err != nil {
- return cmdutil.AddSourceToErr("creating", info.Source, err)
- }
- info.Refresh(obj, true)
- }
-
- count++
-
- return o.PrintObj(info.Object)
- })
- if err != nil {
- return err
- }
- if count == 0 {
- return fmt.Errorf("no objects passed to create")
- }
- return nil
- }
rest调用通过cliet-go发起远程调用 源码可以追溯到staging/src/k8s.io/client-go/rest/request.go:826 request 方法,如下源码:
- // request connects to the server and invokes the provided function when a server response is
- // received. It handles retry behavior and up front validation of requests. It will invoke
- // fn at most once. It will return an error if a problem occurred prior to connecting to the
- // server - the provided function is responsible for handling server errors.
- func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
- //Metrics for total request latency
- start := time.Now()
- defer func() {
- metrics.RequestLatency.Observe(r.verb, r.finalURLTemplate(), time.Since(start))
- }()
-
- if r.err != nil {
- klog.V(4).Infof("Error in request: %v", r.err)
- return r.err
- }
-
- if err := r.requestPreflightCheck(); err != nil {
- return err
- }
-
- client := r.c.Client
- if client == nil {
- client = http.DefaultClient
- }
-
- // Throttle the first try before setting up the timeout configured on the
- // client. We don't want a throttled client to return timeouts to callers
- // before it makes a single request.
- if err := r.tryThrottle(ctx); err != nil {
- return err
- }
-
- if r.timeout > 0 {
- var cancel context.CancelFunc
- ctx, cancel = context.WithTimeout(ctx, r.timeout)
- defer cancel()
- }
-
- // Right now we make about ten retry attempts if we get a Retry-After response.
- retries := 0
- for {
-
- url := r.URL().String()
- req, err := http.NewRequest(r.verb, url, r.body)
- if err != nil {
- return err
- }
- req = req.WithContext(ctx)
- req.Header = r.headers
-
- r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
- if retries > 0 {
- // We are retrying the request that we already send to apiserver
- // at least once before.
- // This request should also be throttled with the client-internal rate limiter.
- if err := r.tryThrottle(ctx); err != nil {
- return err
- }
- }
- resp, err := client.Do(req)
- updateURLMetrics(r, resp, err)
- if err != nil {
- r.backoff.UpdateBackoff(r.URL(), err, 0)
- } else {
- r.backoff.UpdateBackoff(r.URL(), err, resp.StatusCode)
- }
- if err != nil {
- // "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
- // Thus in case of "GET" operations, we simply retry it.
- // We are not automatically retrying "write" operations, as
- // they are not idempotent.
- if r.verb != "GET" {
- return err
- }
- // For connection errors and apiserver shutdown errors retry.
- if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
- // For the purpose of retry, we set the artificial "retry-after" response.
- // TODO: Should we clean the original response if it exists?
- resp = &http.Response{
- StatusCode: http.StatusInternalServerError,
- Header: http.Header{"Retry-After": []string{"1"}},
- Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
- }
- } else {
- return err
- }
- }
-
- done := func() bool {
- // Ensure the response body is fully read and closed
- // before we reconnect, so that we reuse the same TCP
- // connection.
- defer func() {
- const maxBodySlurpSize = 2 << 10
- if resp.ContentLength <= maxBodySlurpSize {
- io.Copy(ioutil.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
- }
- resp.Body.Close()
- }()
-
- retries++
- if seconds, wait := checkWait(resp); wait && retries <= r.maxRetries {
- if seeker, ok := r.body.(io.Seeker); ok && r.body != nil {
- _, err := seeker.Seek(0, 0)
- if err != nil {
- klog.V(4).Infof("Could not retry request, can't Seek() back to beginning of body for %T", r.body)
- fn(req, resp)
- return true
- }
- }
-
- klog.V(4).Infof("Got a Retry-After %ds response for attempt %d to %v", seconds, retries, url)
- r.backoff.Sleep(time.Duration(seconds) * time.Second)
- return false
- }
- fn(req, resp)
- return true
- }()
- if done {
- return nil
- }
- }
- }
其中构造URL参数依赖方法 staging/src/k8s.io/client-go/rest/request.go:468 URL() 方法,在该方法中完成命令行参数转变为url path的拼接封装,如下源码:
- staging/src/k8s.io/client-go/rest/request.go:468
-
- // URL returns the current working URL.
- func (r *Request) URL() *url.URL {
- p := r.pathPrefix
- if r.namespaceSet && len(r.namespace) > 0 {
- p = path.Join(p, "namespaces", r.namespace)
- }
- if len(r.resource) != 0 {
- p = path.Join(p, strings.ToLower(r.resource))
- }
- // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
- if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
- p = path.Join(p, r.resourceName, r.subresource, r.subpath)
- }
-
- finalURL := &url.URL{}
- if r.c.base != nil {
- *finalURL = *r.c.base
- }
- finalURL.Path = p
-
- query := url.Values{}
- for key, values := range r.params {
- for _, value := range values {
- query.Add(key, value)
- }
- }
-
- // timeout is handled specially here.
- if r.timeout != 0 {
- query.Set("timeout", r.timeout.String())
- }
- finalURL.RawQuery = query.Encode()
- return finalURL
- }
kubectl命令行,采用cobra.Command创建,其中命令的执行是有一整套的命令执行模板方法,该执行方法的入口为:vendor/github.com/spf13/cobra/command.go:892 ExecuteC方法,具体源码参考如下代码段。代码中执行
- // ExecuteC executes the command.
- func (c *Command) ExecuteC() (cmd *Command, err error) {
- if c.ctx == nil {
- c.ctx = context.Background()
- }
-
- // Regardless of what command execute is called on, run on Root only
- if c.HasParent() {
- return c.Root().ExecuteC()
- }
-
- // windows hook
- if preExecHookFn != nil {
- preExecHookFn(c)
- }
-
- // initialize help as the last point possible to allow for user
- // overriding
- c.InitDefaultHelpCmd()
-
- args := c.args
-
- // Workaround FAIL with "go test -v" or "cobra.test -test.v", see #155
- if c.args == nil && filepath.Base(os.Args[0]) != "cobra.test" {
- args = os.Args[1:]
- }
-
- // initialize the hidden command to be used for bash completion
- c.initCompleteCmd(args)
-
- var flags []string
- if c.TraverseChildren {
- cmd, flags, err = c.Traverse(args)
- } else {
- cmd, flags, err = c.Find(args)
- }
- if err != nil {
- // If found parse to a subcommand and then failed, talk about the subcommand
- if cmd != nil {
- c = cmd
- }
- if !c.SilenceErrors {
- c.Println("Error:", err.Error())
- c.Printf("Run '%v --help' for usage.\n", c.CommandPath())
- }
- return c, err
- }
-
- cmd.commandCalledAs.called = true
- if cmd.commandCalledAs.name == "" {
- cmd.commandCalledAs.name = cmd.Name()
- }
-
- // We have to pass global context to children command
- // if context is present on the parent command.
- if cmd.ctx == nil {
- cmd.ctx = c.ctx
- }
-
- err = cmd.execute(flags)
- if err != nil {
- // Always show help if requested, even if SilenceErrors is in
- // effect
- if err == flag.ErrHelp {
- cmd.HelpFunc()(cmd, args)
- return cmd, nil
- }
-
- // If root command has SilentErrors flagged,
- // all subcommands should respect it
- if !cmd.SilenceErrors && !c.SilenceErrors {
- c.Println("Error:", err.Error())
- }
-
- // If root command has SilentUsage flagged,
- // all subcommands should respect it
- if !cmd.SilenceUsage && !c.SilenceUsage {
- c.Println(cmd.UsageString())
- }
- }
- return cmd, err
- }
执行模板中依赖的执行execute方法vendor/github.com/spf13/cobra/command.go:755 execute方法,该方法定义了各个方法的前后依赖关系,执行顺序如下:
- // The *Run functions are executed in the following order:
- // * PersistentPreRun()
- // * PreRun()
- // * Run()
- // * PostRun()
- // * PersistentPostRun()
- // All functions get the same args, the arguments after the command name.
调用服务端通过rest方法调用,依赖go语言底层的http的包实现,具体参考:vendor/k8s.io/client-go/rest/request.go:826 request 方法。
- func (c *Command) execute(a []string) (err error) {
- if c == nil {
- return fmt.Errorf("Called Execute() on a nil Command")
- }
-
- if len(c.Deprecated) > 0 {
- c.Printf("Command %q is deprecated, %s\n", c.Name(), c.Deprecated)
- }
-
- // initialize help and version flag at the last point possible to allow for user
- // overriding
- c.InitDefaultHelpFlag()
- c.InitDefaultVersionFlag()
-
- err = c.ParseFlags(a)
- if err != nil {
- return c.FlagErrorFunc()(c, err)
- }
-
- // If help is called, regardless of other flags, return we want help.
- // Also say we need help if the command isn't runnable.
- helpVal, err := c.Flags().GetBool("help")
- if err != nil {
- // should be impossible to get here as we always declare a help
- // flag in InitDefaultHelpFlag()
- c.Println("\"help\" flag declared as non-bool. Please correct your code")
- return err
- }
-
- if helpVal {
- return flag.ErrHelp
- }
-
- // for back-compat, only add version flag behavior if version is defined
- if c.Version != "" {
- versionVal, err := c.Flags().GetBool("version")
- if err != nil {
- c.Println("\"version\" flag declared as non-bool. Please correct your code")
- return err
- }
- if versionVal {
- err := tmpl(c.OutOrStdout(), c.VersionTemplate(), c)
- if err != nil {
- c.Println(err)
- }
- return err
- }
- }
-
- if !c.Runnable() {
- return flag.ErrHelp
- }
-
- c.preRun()
-
- argWoFlags := c.Flags().Args()
- if c.DisableFlagParsing {
- argWoFlags = a
- }
-
- if err := c.ValidateArgs(argWoFlags); err != nil {
- return err
- }
-
- for p := c; p != nil; p = p.Parent() {
- if p.PersistentPreRunE != nil {
- if err := p.PersistentPreRunE(c, argWoFlags); err != nil {
- return err
- }
- break
- } else if p.PersistentPreRun != nil {
- p.PersistentPreRun(c, argWoFlags)
- break
- }
- }
- if c.PreRunE != nil {
- if err := c.PreRunE(c, argWoFlags); err != nil {
- return err
- }
- } else if c.PreRun != nil {
- c.PreRun(c, argWoFlags)
- }
-
- if err := c.validateRequiredFlags(); err != nil {
- return err
- }
- if c.RunE != nil {
- if err := c.RunE(c, argWoFlags); err != nil {
- return err
- }
- } else {
- c.Run(c, argWoFlags)
- }
- if c.PostRunE != nil {
- if err := c.PostRunE(c, argWoFlags); err != nil {
- return err
- }
- } else if c.PostRun != nil {
- c.PostRun(c, argWoFlags)
- }
- for p := c; p != nil; p = p.Parent() {
- if p.PersistentPostRunE != nil {
- if err := p.PersistentPostRunE(c, argWoFlags); err != nil {
- return err
- }
- break
- } else if p.PersistentPostRun != nil {
- p.PersistentPostRun(c, argWoFlags)
- break
- }
- }
-
- return nil
- }
上一节讲了客户端kubectl客户端发起命令,最终转换为http rest请求,触发服务调用。那么服务端是如何接收并处理相应的请求呢?服务端由apiserver接收来自kubectl客户端的调用,并进行相应的业务逻辑处理。服务端的整体框架的构建参考如下章节描述。
服务端apiserver启动入口:cmd/kube-apiserver/apiserver.go:32 main()方法,通过该方法启动apiserver。源码入口如下:
- func main() {
- rand.Seed(time.Now().UnixNano())
-
- command := app.NewAPIServerCommand()
-
- // TODO: once we switch everything over to Cobra commands, we can go back to calling
- // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
- // normalize func and add the go flag set by hand.
- // utilflag.InitFlags()
- logs.InitLogs()
- defer logs.FlushLogs()
-
- if err := command.Execute(); err != nil {
- os.Exit(1)
- }
- }
通过层层代码跟进,发现服务端启动APIServer,执行命令分创建服务调用链路,然后准备执行,最后执行启动Server方法,入口为:cmd/kube-apiserver/app/server.go:161 Run()方法,源码如下:
- cmd/kube-apiserver/app/server.go:161
- // Run runs the specified APIServer. This should never exit.
- func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
- // To help debugging, immediately log version
- klog.Infof("Version: %+v", version.Get())
-
- server, err := CreateServerChain(completeOptions, stopCh)
- if err != nil {
- return err
- }
-
- prepared, err := server.PrepareRun()
- if err != nil {
- return err
- }
-
- return prepared.Run(stopCh)
- }
-
-
- // 层层分析深入到如下代码:
- staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:316
-
- // Run spawns the secure http server. It only returns if stopCh is closed
- // or the secure port cannot be listened on initially.
- func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
- delayedStopCh := make(chan struct{})
-
- go func() {
- defer close(delayedStopCh)
-
- <-stopCh
-
- // As soon as shutdown is initiated, /readyz should start returning failure.
- // This gives the load balancer a window defined by ShutdownDelayDuration to detect that /readyz is red
- // and stop sending traffic to this server.
- close(s.readinessStopCh)
-
- time.Sleep(s.ShutdownDelayDuration)
- }()
-
- // close socket after delayed stopCh
- stoppedCh, err := s.NonBlockingRun(delayedStopCh)
- if err != nil {
- return err
- }
-
- <-stopCh
-
- // run shutdown hooks directly. This includes deregistering from the kubernetes endpoint in case of kube-apiserver.
- err = s.RunPreShutdownHooks()
- if err != nil {
- return err
- }
-
- // wait for the delayed stopCh before closing the handler chain (it rejects everything after Wait has been called).
- <-delayedStopCh
- // wait for stoppedCh that is closed when the graceful termination (server.Shutdown) is finished.
- <-stoppedCh
-
- // Wait for all requests to finish, which are bounded by the RequestTimeout variable.
- s.HandlerChainWaitGroup.Wait()
-
- return nil
- }
通过深入分析源码,进入到启动http Server的入口: staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go:360 NonBlockingRun()方法,如下:
- // NonBlockingRun spawns the secure http server. An error is
- // returned if the secure port cannot be listened on.
- // The returned channel is closed when the (asynchronous) termination is finished.
- func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan struct{}, error) {
- // Use an stop channel to allow graceful shutdown without dropping audit events
- // after http server shutdown.
- auditStopCh := make(chan struct{})
-
- // Start the audit backend before any request comes in. This means we must call Backend.Run
- // before http server start serving. Otherwise the Backend.ProcessEvents call might block.
- if s.AuditBackend != nil {
- if err := s.AuditBackend.Run(auditStopCh); err != nil {
- return nil, fmt.Errorf("failed to run the audit backend: %v", err)
- }
- }
-
- // Use an internal stop channel to allow cleanup of the listeners on error.
- internalStopCh := make(chan struct{})
- var stoppedCh <-chan struct{}
- if s.SecureServingInfo != nil && s.Handler != nil {
- var err error
- stoppedCh, err = s.SecureServingInfo.Serve(s.Handler, s.ShutdownTimeout, internalStopCh)
- if err != nil {
- close(internalStopCh)
- close(auditStopCh)
- return nil, err
- }
- }
-
- // Now that listener have bound successfully, it is the
- // responsibility of the caller to close the provided channel to
- // ensure cleanup.
- go func() {
- <-stopCh
- close(internalStopCh)
- if stoppedCh != nil {
- <-stoppedCh
- }
- s.HandlerChainWaitGroup.Wait()
- close(auditStopCh)
- }()
-
- s.RunPostStartHooks(stopCh)
-
- if _, err := systemd.SdNotify(true, "READY=1\n"); err != nil {
- klog.Errorf("Unable to send systemd daemon successful start message: %v\n", err)
- }
-
- return stoppedCh, nil
- }
启动http Server 方法入口: staging/src/k8s.io/apiserver/pkg/server/secure_serving.go:147 Serve()方法,源码如下所示。该方法中最后执行RunServer()方法启动http server监听请求服务,并进行相应的资源处理。
- // Serve runs the secure http server. It fails only if certificates cannot be loaded or the initial listen call fails.
- // The actual server loop (stoppable by closing stopCh) runs in a go routine, i.e. Serve does not block.
- // It returns a stoppedCh that is closed when all non-hijacked active requests have been processed.
- func (s *SecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) (<-chan struct{}, error) {
- if s.Listener == nil {
- return nil, fmt.Errorf("listener must not be nil")
- }
-
- tlsConfig, err := s.tlsConfig(stopCh)
- if err != nil {
- return nil, err
- }
-
- secureServer := &http.Server{
- Addr: s.Listener.Addr().String(),
- Handler: handler,
- MaxHeaderBytes: 1 << 20,
- TLSConfig: tlsConfig,
- }
-
- // At least 99% of serialized resources in surveyed clusters were smaller than 256kb.
- // This should be big enough to accommodate most API POST requests in a single frame,
- // and small enough to allow a per connection buffer of this size multiplied by `MaxConcurrentStreams`.
- const resourceBody99Percentile = 256 * 1024
-
- http2Options := &http2.Server{}
-
- // shrink the per-stream buffer and max framesize from the 1MB default while still accommodating most API POST requests in a single frame
- http2Options.MaxUploadBufferPerStream = resourceBody99Percentile
- http2Options.MaxReadFrameSize = resourceBody99Percentile
-
- // use the overridden concurrent streams setting or make the default of 250 explicit so we can size MaxUploadBufferPerConnection appropriately
- if s.HTTP2MaxStreamsPerConnection > 0 {
- http2Options.MaxConcurrentStreams = uint32(s.HTTP2MaxStreamsPerConnection)
- } else {
- http2Options.MaxConcurrentStreams = 250
- }
-
- // increase the connection buffer size from the 1MB default to handle the specified number of concurrent streams
- http2Options.MaxUploadBufferPerConnection = http2Options.MaxUploadBufferPerStream * int32(http2Options.MaxConcurrentStreams)
-
- if !s.DisableHTTP2 {
- // apply settings to the server
- if err := http2.ConfigureServer(secureServer, http2Options); err != nil {
- return nil, fmt.Errorf("error configuring http2: %v", err)
- }
- }
-
- // use tlsHandshakeErrorWriter to handle messages of tls handshake error
- tlsErrorWriter := &tlsHandshakeErrorWriter{os.Stderr}
- tlsErrorLogger := log.New(tlsErrorWriter, "", 0)
- secureServer.ErrorLog = tlsErrorLogger
-
- klog.Infof("Serving securely on %s", secureServer.Addr)
- return RunServer(secureServer, s.Listener, shutdownTimeout, stopCh)
- }
在上面3.2节中的Run方法中,通过CreateServerChain方法构建ApiServer服务请求的整套体系,该方法中调用CreateKubeAPIServer()方法,该方法中kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)对New方法的调用,然后对s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)方法调用,在NewAPIServerHandler方法中创建了路由关系的处理,即:gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
具体源码见:vendor/k8s.io/apiserver/pkg/server/handler.go:73 NewAPIServerHandler() 如下:
- cmd/kube-apiserver/app/server.go:228
-
- // CreateKubeAPIServer creates and wires a workable kube-apiserver
- func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {
- kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
- if err != nil {
- return nil, err
- }
-
- return kubeAPIServer, nil
- }
-
- vendor/k8s.io/apiserver/pkg/server/config.go:522
- / New creates a new server which logically combines the handling chain with the passed server.
- // name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delgating.
- // delegationTarget may not be nil.
- func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
- ..........
- apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())
- ..........
- }
-
- vendor/k8s.io/apiserver/pkg/server/handler.go:73
- func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
- nonGoRestfulMux := mux.NewPathRecorderMux(name)
- if notFoundHandler != nil {
- nonGoRestfulMux.NotFoundHandler(notFoundHandler)
- }
-
- gorestfulContainer := restful.NewContainer()
- gorestfulContainer.ServeMux = http.NewServeMux()
- gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
- gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
- logStackOnRecover(s, panicReason, httpWriter)
- })
- gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
- serviceErrorHandler(s, serviceErr, request, response)
- })
-
- director := director{
- name: name,
- goRestfulContainer: gorestfulContainer,
- nonGoRestfulMux: nonGoRestfulMux,
- }
-
- return &APIServerHandler{
- FullHandlerChain: handlerChainBuilder(director),
- GoRestfulContainer: gorestfulContainer,
- NonGoRestfulMux: nonGoRestfulMux,
- Director: director,
- }
- }
关于各个Rest的处理,依赖各个WebService服务进行处理,通过循环匹配path和webservice对象,实现服务之间的映射匹配,选择匹配最成功的一个进行相应的request的处理,具体的参考源码vendor/github.com/emicklei/go-restful/curly.go:19 CurlyRouter.SelectRoute()方法,具体源码如下:
- vendor/github.com/emicklei/go-restful/curly.go:19
-
- // SelectRoute is part of the Router interface and returns the best match
- // for the WebService and its Route for the given Request.
- func (c CurlyRouter) SelectRoute(
- webServices []*WebService,
- httpRequest *http.Request) (selectedService *WebService, selected *Route, err error) {
-
- requestTokens := tokenizePath(httpRequest.URL.Path)
-
- detectedService := c.detectWebService(requestTokens, webServices)
- if detectedService == nil {
- if trace {
- traceLogger.Printf("no WebService was found to match URL path:%s\n", httpRequest.URL.Path)
- }
- return nil, nil, NewError(http.StatusNotFound, "404: Page Not Found")
- }
- candidateRoutes := c.selectRoutes(detectedService, requestTokens)
- if len(candidateRoutes) == 0 {
- if trace {
- traceLogger.Printf("no Route in WebService with path %s was found to match URL path:%s\n", detectedService.rootPath, httpRequest.URL.Path)
- }
- return detectedService, nil, NewError(http.StatusNotFound, "404: Page Not Found")
- }
- selectedRoute, err := c.detectRoute(candidateRoutes, httpRequest)
- if selectedRoute == nil {
- return detectedService, nil, err
- }
- return detectedService, selectedRoute, nil
- }
-
-
- vendor/github.com/emicklei/go-restful/curly.go:122
- // detectWebService returns the best matching webService given the list of path tokens.
- // see also computeWebserviceScore
- func (c CurlyRouter) detectWebService(requestTokens []string, webServices []*WebService) *WebService {
- var best *WebService
- score := -1
- for _, each := range webServices {
- matches, eachScore := c.computeWebserviceScore(requestTokens, each.pathExpr.tokens)
- if matches && (eachScore > score) {
- best = each
- score = eachScore
- }
- }
- return best
- }
关于WebService与Container的关系,参考源码vendor/github.com/emicklei/go-restful/container.go:88 Add()方法实现将WebService添加到Container对象中。具体的关于服务启动的时候,何时添加服务到Container中,有待进一步分析相关源码。关于WebService struct源码中描述如下: WebService holds a collection of Route values that bind a Http Method + URL Path to a function.
- vendor/github.com/emicklei/go-restful/container.go:88
- // Add a WebService to the Container. It will detect duplicate root paths and exit in that case.
- func (c *Container) Add(service *WebService) *Container {
- c.webServicesLock.Lock()
- defer c.webServicesLock.Unlock()
-
- // if rootPath was not set then lazy initialize it
- if len(service.rootPath) == 0 {
- service.Path("/")
- }
-
- // cannot have duplicate root paths
- for _, each := range c.webServices {
- if each.RootPath() == service.RootPath() {
- log.Printf("WebService with duplicate root path detected:['%v']", each)
- os.Exit(1)
- }
- }
-
- // If not registered on root then add specific mapping
- if !c.isRegisteredOnRoot {
- c.isRegisteredOnRoot = c.addHandler(service, c.ServeMux)
- }
- c.webServices = append(c.webServices, service)
- return c
- }
pkg/kubelet/cri/streaming/server.go:108 NewServer
上述章节,以kubectl create为例,阐述了通过kubectl命令行,创建Command命令,然后通过client-go的rest方法发送请求到服务端的关键流程节点;然后描述了服务端apiserver的启动流程,以及WebService服务注册以及针对http request的路由处理链路,从而实现了端到端的整个流程的流转。关于WebService的构建以及后续的服务逻辑的处理,后续在逐步研究分享。
https://blog.csdn.net/u014458692/article/details/108068088
https://kubernetes.io/zh/docs/concepts/architecture/cloud-controller/
https://www.cnblogs.com/waken-captain/p/10509705.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。