当前位置:   article > 正文

Kubernetes 源码分析 -- API Server之API安装_apiserver proxy handler pod

apiserver proxy handler pod

前言

API Server的启动中,我们直到介绍了三种服务Master、CustomResourceDefinitions、Aggrator的创建,但是具体的API的创建部分没有介绍。
本文旨在把这块说清楚,让我们了解整个API Server对外提供了哪些API,这些API是在怎样被注册到服务中去的。我们知道,API Server对外提供的Http/Https服务,这都是基于go http服务框架来实现。

API Server支持的HTTP服务采用了go-restful与非go-restful混和的方式,主要是因为go-restful的一些标准无法完全满足需求,而且API Server要兼容旧版本的需要,所以引入了这种混杂模式。

go-restful

go-restful是第三方的REST框架,在GitHub上有多个贡献者,采用了“路由”映射的设计思想,并且在API设计中使用了流行的Fluent Style风格,试用起来酣畅淋漓,也难怪Kubernetes选择了它。下面是go-restful的优良特性。

  • Ruby on Rails风格的Rest路由映射,例如/people/{person_id}/groups/{group_id}。
  • 大大简化了Rest API的开发工作。
  • 底层实现采用Golang的HTTP协议栈,几乎没有限制。
  • 拥有完整的单元包代码,很容易开发一个可测试的Rest API。
  • Google AppEngine ready。

go-restful框架中的核心对象如下:

  • restful.Container:代表了一个HTTP Rest服务器,包括一组restful.WebService对象和一个http.ServeMux对象,使用RouteSelector进行请求派发。
  • restful.WebService:标识一个Rest服务,由多个Rest路由(restful.Route)组成,这一组Rest路由共享同一个RootPath。
  • restful.Route:标识一个Rest路由,Rest路由主要由Rest Path、HTTP Method、输入输出类型(HTML/JSON)及对应的回调函数restful.RouteFunction组成。
  • restful.RouteFunction:一个用于处理具体的REST调用的函数接口定义,具体定义为type RouteFunction func(*Request, *Response)。

服务链

服务链的核心是DelegationTarget接口,它让API Server可以实现链式服务,当有HTTP请求到来时,优先让链首去处理URI,如果能够匹配成功就处理,否则交给下一链,一直到链尾。DelegationTarget的定义如下:

  1. type DelegationTarget interface {
  2. // UnprotectedHandler returns a handler that is NOT protected by a normal chain
  3. UnprotectedHandler() http.Handler
  4. // RequestContextMapper returns the existing RequestContextMapper. Because we cannot rewire all existing
  5. // uses of this function, this will be used in any delegating API server
  6. RequestContextMapper() apirequest.RequestContextMapper
  7. // PostStartHooks returns the post-start hooks that need to be combined
  8. PostStartHooks() map[string]postStartHookEntry
  9. // PreShutdownHooks returns the pre-stop hooks that need to be combined
  10. PreShutdownHooks() map[string]preShutdownHookEntry
  11. // HealthzChecks returns the healthz checks that need to be combined
  12. HealthzChecks() []healthz.HealthzChecker
  13. // ListedPaths returns the paths for supporting an index
  14. ListedPaths() []string
  15. // NextDelegate returns the next delegationTarget in the chain of delegations
  16. NextDelegate() DelegationTarget
  17. }
  • 它有一个空的实现emptyDelegate,一般作为链尾,由于是空的实现,所以具体的定义就不列举了。
  • 它的另一个实现是GenericAPIServer,如下所示:
  1. type GenericAPIServer struct {
  2. ......
  3. // delegationTarget is the next delegate in the chain or nil
  4. delegationTarget DelegationTarget
  5. // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
  6. HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup

GenericAPIServer定义了一个delegationTarget成员,在API Server整套系统中,总共有三个服务,出了链尾,都是指向GenericAPIServer实例,该成员让GenericAPIServer实现了一套链的功能。

三种服务

APIServer最终提供链式服务把基本的API Server、CustomResource、Aggregator这三种服务采用链式结构串联起来,对外提供服务。这种链式服务为API Server的可扩展性提供了基础,使增添新的服务功能,不会影响到现有的框架,只需要追加新的服务,放到链中就能够实现。而GenericAPIServer是三种服务的基础,在这三种服务中:
Master是API Server的基础服务,它提供的基础资源的API服务;
其他两种CustomResourceDefinitions(简称CRD)、API Server Aggregation(简称AA),提供了自定义资源的能力。具体见自定义资源

我们看一下对应的三种服务的定义:

基础的API Server:代码在k8s.io/kubernetes/pkg/master/master.go中

Master的结构定义如下所示:

  1. type Master struct {
  2. GenericAPIServer *genericapiserver.GenericAPIServer
  3. ClientCARegistrationHook ClientCARegistrationHook
  4. }

在Master实例的构建过程中,就完成了API的Install,具体代码见:completedConfig.New,在这里完成了传统API的安装以及新的资源API的安装。在早期的Kubernetes API Server的代码中,只有Master这一种服务,所以一些常见的资源如pods、service等等,都是基于传统的方式安装到REST中,而随着k8s的发展,涌现出了多种资源,而且他们的版本号也不在是v1版本,如:tokenreviews、horizontalpodautoscalers、jobs等等。所以在Master的Install API中,我们可以到InstallAPIS和InstallLegacyAPI。

  • InstallLegacyAPI

传统的API都是一些核心资源,他们的GroupName="",版本都是v1,所以,统一处于一个APIGroupInfo中,相关代码不一一贴出来了,主要的代码逻辑是,创建各种核心资源的REST对象最终组装成
map[string]rest.Storage对象。

  1. restStorageMap := map[string]rest.Storage{
  2. "pods": podStorage.Pod,
  3. "pods/attach": podStorage.Attach,
  4. "pods/status": podStorage.Status,
  5. "pods/log": podStorage.Log,
  6. "pods/exec": podStorage.Exec,
  7. "pods/portforward": podStorage.PortForward,
  8. "pods/proxy": podStorage.Proxy,
  9. "pods/binding": podStorage.Binding,
  10. "bindings": podStorage.Binding,
  11. "podTemplates": podTemplateStorage,
  12. "replicationControllers": controllerStorage.Controller,
  13. "replicationControllers/status": controllerStorage.Status,
  14. "services": serviceRest.Service,
  15. "services/proxy": serviceRest.Proxy,
  16. "services/status": serviceStatusStorage,
  17. "endpoints": endpointsStorage,
  18. "nodes": nodeStorage.Node,
  19. "nodes/status": nodeStorage.Status,
  20. "nodes/proxy": nodeStorage.Proxy,
  21. "events": eventStorage,
  22. "limitRanges": limitRangeStorage,
  23. "resourceQuotas": resourceQuotaStorage,
  24. "resourceQuotas/status": resourceQuotaStatusStorage,
  25. "namespaces": namespaceStorage,
  26. "namespaces/status": namespaceStatusStorage,
  27. "namespaces/finalize": namespaceFinalizeStorage,
  28. "secrets": secretStorage,
  29. "serviceAccounts": serviceAccountStorage,
  30. "persistentVolumes": persistentVolumeStorage,
  31. "persistentVolumes/status": persistentVolumeStatusStorage,
  32. "persistentVolumeClaims": persistentVolumeClaimStorage,
  33. "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
  34. "configMaps": configMapStorage,
  35. "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
  36. }
  37. if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
  38. restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
  39. }
  40. if legacyscheme.Registry.IsEnabledVersion(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
  41. restStorageMap["pods/eviction"] = podStorage.Eviction
  42. }

这个数据结构map[string]rest.Storage是key是REST的path,所以很明显它是API暴漏的关键,我们在后面也会陆续讲到,请参考Installer章节。

  • 现代的API
    其实后面的CRD与AA都是采用这种模式,但是Master中封装的更好。它对各种新的资源实现了相应的RESTStorageProvider,接口定义如下:
  1. type RESTStorageProvider interface {
  2. GroupName() string
  3. NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)
  4. }

RESTStorageProvider实现为REST storage的工厂,这样每个资源实现自己的NewRESTStorage方法,并构建相应的APIGroupInfo。相应的资源为:

  1. restStorageProviders := []RESTStorageProvider{
  2. authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
  3. authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
  4. autoscalingrest.RESTStorageProvider{},
  5. batchrest.RESTStorageProvider{},
  6. certificatesrest.RESTStorageProvider{},
  7. extensionsrest.RESTStorageProvider{},
  8. networkingrest.RESTStorageProvider{},
  9. policyrest.RESTStorageProvider{},
  10. rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
  11. schedulingrest.RESTStorageProvider{},
  12. settingsrest.RESTStorageProvider{},
  13. storagerest.RESTStorageProvider{},
  14. // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
  15. // See https://github.com/kubernetes/kubernetes/issues/42392
  16. appsrest.RESTStorageProvider{},
  17. admissionregistrationrest.RESTStorageProvider{},
  18. eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
  19. }

自定义资源服务:代码在k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go中

自定义资源服务的结构代码如下:

  1. type CustomResourceDefinitions struct {
  2. GenericAPIServer *genericapiserver.GenericAPIServer
  3. // provided for easier embedding
  4. Informers internalinformers.SharedInformerFactory
  5. }

自定义资源服务的API的安装也是在completedConfig.New中完成,API较少,相关的代码如下所示:

  1. apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Registry, Scheme, metav1.ParameterCodec, Codecs)
  2. if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
  3. apiGroupInfo.GroupMeta.GroupVersion = v1beta1.SchemeGroupVersion
  4. storage := map[string]rest.Storage{}
  5. // customresourcedefinitions
  6. customResourceDefintionStorage := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
  7. storage["customresourcedefinitions"] = customResourceDefintionStorage
  8. storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)
  9. apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = storage
  10. }
  11. // 安装到ApiServerHandler的GoRestfulContainer,基于Go-restful框架的服务
  12. // 提供了customresourcedefinitions资源的服务能力
  13. if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
  14. return nil, err
  15. }
  16. ......
  17. crdHandler := NewCustomResourceDefinitionHandler(
  18. versionDiscoveryHandler,
  19. groupDiscoveryHandler,
  20. s.GenericAPIServer.RequestContextMapper(),
  21. s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
  22. delegateHandler,
  23. c.ExtraConfig.CRDRESTOptionsGetter,
  24. c.GenericConfig.AdmissionControl,
  25. )
  26. // 安装到ApiServerHandler的NonGoRestfulMux,基于go-http普通框架的服务
  27. // crdHandler提供自定义资源的API功能,主要包括资源的以下操作:
  28. // get/list/watch/create/udpate/patch/delete/deletecollection
  29. s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
  30. s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

在New方法中,除了上述API的安装外,还有crd、naming、finalizing控制器以及各种Informers,有必要去看待吗,这里不列举。

AggregateServer :代码在k8s.io/kube-aggregator/pkg/apiserver/apiserver.go中

主K8S API Server处理built-in资源,如Pods和Services等等,而CRD能够让我们实现了一些通用的自定义资源。
AggregateServer实现了API Server Aggregation功能,它让我们可以提供自定义资源的特殊实现,并且部署我们独立API Server,主API Server把请求代理给独立的API Server来处理自定义资源,从而让资源对它的所有的客户端可用。

AggregatorServer的构建代码也是对应的completeConfig.NewWithDelegate方法中完成,该方法带了一个参数,参数的实例基于主API Server对应的GenericAPIServer实例。

AggregatorServer启动了APIServer的共享通知: &apiregistration.APIService{},基于共享通知的消息,在两个控制器中进行处理:apiserviceRegistrationController和availableController。

在API的处理上也是分为两部分:

  • go-restful部分,也是基于APIGroupInfo来安装API,见k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go中的NewRESTStorage方法:
  1. func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) genericapiserver.APIGroupInfo {
  2. apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Registry, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)
  3. if apiResourceConfigSource.VersionEnabled(v1beta1.SchemeGroupVersion) {
  4. apiGroupInfo.GroupMeta.GroupVersion = v1beta1.SchemeGroupVersion
  5. storage := map[string]rest.Storage{}
  6. apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
  7. storage["apiservices"] = apiServiceREST
  8. storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
  9. apiGroupInfo.VersionedResourcesStorageMap["v1beta1"] = storage
  10. }
  11. if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {
  12. apiGroupInfo.GroupMeta.GroupVersion = v1.SchemeGroupVersion
  13. storage := map[string]rest.Storage{}
  14. apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)
  15. storage["apiservices"] = apiServiceREST
  16. storage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)
  17. apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
  18. }
  19. return apiGroupInfo
  20. }

在代码中,根据是否启用v1beta1和v1版本,实现了apiservice和apiservices/status两种资源类型。

  • non-go-restful部分,见:k8s.io/kubu-aggregator/pkg/apiserver/handler_apis.go中的apisHandler结构
    apisHanders服务与/apis端,它实现了http.Handler,下面是它的ServeHTTP方法的代码实现:
  1. func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  2. ctx, ok := r.mapper.Get(req)
  3. if !ok {
  4. responsewriters.InternalError(w, req, errors.New("no context found for request"))
  5. return
  6. }
  7. discoveryGroupList := &metav1.APIGroupList{
  8. // always add OUR api group to the list first. Since we'll never have a registered APIService for it
  9. // and since this is the crux of the API, having this first will give our names priority. It's good to be king.
  10. Groups: []metav1.APIGroup{discoveryGroup},
  11. }
  12. apiServices, err := r.lister.List(labels.Everything())
  13. if err != nil {
  14. http.Error(w, err.Error(), http.StatusInternalServerError)
  15. return
  16. }
  17. apiServicesByGroup := apiregistrationapi.SortedByGroupAndVersion(apiServices)
  18. for _, apiGroupServers := range apiServicesByGroup {
  19. // skip the legacy group
  20. if len(apiGroupServers[0].Spec.Group) == 0 {
  21. continue
  22. }
  23. discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers)
  24. if discoveryGroup != nil {
  25. discoveryGroupList.Groups = append(discoveryGroupList.Groups, *discoveryGroup)
  26. }
  27. }
  28. responsewriters.WriteObjectNegotiated(ctx, r.codecs, schema.GroupVersion{}, w, req, http.StatusOK, discoveryGroupList)
  29. }

问:看起来apisHandlers的目的是把找到的合适的api server返回?这点我没有搞明白,理论上,AA应该是做代理功能,把请求转发给对应的自定义API Servers去处理才对。

ANSWER:明白了,apisHandlers其实就是ROOT路径/apis的服务器,本来是由GenericAPIServer.DiscoveryGroupManager来实现这个功能的,但是AA需要实时获取注册进来的自定义API Servers信息,并且AA相关信息总是放在/apis相应的头部,如下所示:(没有自定义的API Servers的情况)

  1. yuxianbing@ubuntu:~$ curl http://127.0.0.1:8080/apis
  2. {
  3. "kind": "APIGroupList",
  4. "apiVersion": "v1",
  5. "groups": [
  6. {
  7. "name": "apiregistration.k8s.io",
  8. "versions": [
  9. {
  10. "groupVersion": "apiregistration.k8s.io/v1",
  11. "version": "v1"
  12. },
  13. {
  14. "groupVersion": "apiregistration.k8s.io/v1beta1",
  15. "version": "v1beta1"
  16. }
  17. ],
  18. "preferredVersion": {
  19. "groupVersion": "apiregistration.k8s.io/v1",
  20. "version": "v1"
  21. },
  22. "serverAddressByClientCIDRs": null
  23. },
  24. ......

GenericAPIServer

下面是GenericAPIServer的完整定义:

  1. // GenericAPIServer contains state for a Kubernetes cluster api server.
  2. type GenericAPIServer struct {
  3. // discoveryAddresses is used to build cluster IPs for discovery.
  4. discoveryAddresses discovery.Addresses
  5. // LoopbackClientConfig is a config for a privileged loopback connection to the API server
  6. LoopbackClientConfig *restclient.Config
  7. // minRequestTimeout is how short the request timeout can be. This is used to build the RESTHandler
  8. minRequestTimeout time.Duration
  9. // ShutdownTimeout is the timeout used for server shutdown. This specifies the timeout before server
  10. // gracefully shutdown returns.
  11. ShutdownTimeout time.Duration
  12. // legacyAPIGroupPrefixes is used to set up URL parsing for authorization and for validating requests
  13. // to InstallLegacyAPIGroup
  14. legacyAPIGroupPrefixes sets.String
  15. // admissionControl is used to build the RESTStorage that backs an API Group.
  16. admissionControl admission.Interface
  17. // requestContextMapper provides a way to get the context for a request. It may be nil.
  18. requestContextMapper apirequest.RequestContextMapper
  19. SecureServingInfo *SecureServingInfo
  20. // ExternalAddress is the address (hostname or IP and port) that should be used in
  21. // external (public internet) URLs for this GenericAPIServer.
  22. ExternalAddress string
  23. // Serializer controls how common API objects not in a group/version prefix are serialized for this server.
  24. // Individual APIGroups may define their own serializers.
  25. Serializer runtime.NegotiatedSerializer
  26. // "Outputs"
  27. // Handler holds the handlers being used by this API server
  28. // 实现了API Server对外的API功能,主要包括两个部分:go-restful和non-go-restful。
  29. Handler *APIServerHandler
  30. // listedPathProvider is a lister which provides the set of paths to show at /
  31. listedPathProvider routes.ListedPathProvider
  32. // DiscoveryGroupManager serves /apis
  33. // 一般用于实现/apis的服务,从/apis的输出来看,基本上显示的是各个Group的API与版本的信息。
  34. DiscoveryGroupManager discovery.GroupManager
  35. // Enable swagger and/or OpenAPI if these configs are non-nil.
  36. swaggerConfig *swagger.Config
  37. openAPIConfig *openapicommon.Config
  38. // PostStartHooks are each called after the server has started listening, in a separate go func for each
  39. // with no guarantee of ordering between them. The map key is a name used for error reporting.
  40. // It may kill the process with a panic if it wishes to by returning an error.
  41. postStartHookLock sync.Mutex
  42. postStartHooks map[string]postStartHookEntry
  43. postStartHooksCalled bool
  44. disabledPostStartHooks sets.String
  45. preShutdownHookLock sync.Mutex
  46. preShutdownHooks map[string]preShutdownHookEntry
  47. preShutdownHooksCalled bool
  48. // healthz checks
  49. healthzLock sync.Mutex
  50. healthzChecks []healthz.HealthzChecker
  51. healthzCreated bool
  52. // auditing. The backend is started after the server starts listening.
  53. AuditBackend audit.Backend
  54. // enableAPIResponseCompression indicates whether API Responses should support compression
  55. // if the client requests it via Accept-Encoding
  56. enableAPIResponseCompression bool
  57. // delegationTarget is the next delegate in the chain or nil
  58. // 实现链式结构,一般指向的实例类型也是GenericAPIServer
  59. delegationTarget DelegationTarget
  60. // HandlerChainWaitGroup allows you to wait for all chain handlers finish after the server shutdown.
  61. HandlerChainWaitGroup *utilwaitgroup.SafeWaitGroup
  62. }

GenericAPIServer里面包含了几个重要的成员,首先是APIServerHandler类型的Handler成员,它存储了该Server所服务的API,后面要讲到的API的安装功能,主要就是把要服务的资源对应的API存储在Handler中的GoRestfulContainer中。

APIServerHandler包含了API Server使用的多种http.Handler类型,包括go-restful以及non-go-restful,以及在以上两者之间选择的Director对象,API URI处理的选择过程为:FullHandlerChain-> Director ->{GoRestfulContainer, NonGoRestfulMux}。

其次是DiscoveryGroupManager,它负责存储支持的API的Group和Version信息,并提供对/apis的调用服务。

APIGroupInfo

前面在三个API Server的API安装中,提到过go-restful模式的API安装都用到了APIGroupInfo,一般都是先生成一个APIGroupInfo实例,在该实例中,把我们关心的资源类型存好,然后调用GenericAPIServer.InstallAPIGroup完成API的Install操作。

  1. // Info about an API group.
  2. type APIGroupInfo struct {
  3. GroupMeta apimachinery.GroupMeta
  4. // Info about the resources in this group. Its a map from version to resource to the storage.
  5. VersionedResourcesStorageMap map[string]map[string]rest.Storage
  6. // OptionsExternalVersion controls the APIVersion used for common objects in the
  7. // schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
  8. // define a version "v1beta1" but want to use the Kubernetes "v1" internal objects.
  9. // If nil, defaults to groupMeta.GroupVersion.
  10. // TODO: Remove this when https://github.com/kubernetes/kubernetes/issues/19018 is fixed.
  11. OptionsExternalVersion *schema.GroupVersion
  12. // MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
  13. // common API implementations like ListOptions. Future changes will allow this to vary by group
  14. // version (for when the inevitable meta/v2 group emerges).
  15. MetaGroupVersion *schema.GroupVersion
  16. // Scheme includes all of the types used by this group and how to convert between them (or
  17. // to convert objects from outside of this group that are accepted in this API).
  18. // TODO: replace with interfaces
  19. Scheme *runtime.Scheme
  20. // NegotiatedSerializer controls how this group encodes and decodes data
  21. NegotiatedSerializer runtime.NegotiatedSerializer
  22. // ParameterCodec performs conversions for query parameters passed to API calls
  23. ParameterCodec runtime.ParameterCodec
  24. }

GenericAPIServer.InstallAPIGroup

InstallAPIGroup负责把给定的API Group暴漏到API中,主要功能有两块:

  • 调用installAPIResources实现资源API的暴漏
  • API Group的的服务,为了/apis和/apis/<GroupName>两种服务,其中/apis通过把信息存放在DiscoveryGroupManager成员中,/apis/<GroupName>的通过构建APIGroupHandler,并存放到GoRestfulContainer中。

代码如下所示:

  1. func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
  2. // Do not register empty group or empty version. Doing so claims /apis/ for the wrong entity to be returned.
  3. // Catching these here places the error much closer to its origin
  4. if len(apiGroupInfo.GroupMeta.GroupVersion.Group) == 0 {
  5. return fmt.Errorf("cannot register handler with an empty group for %#v", *apiGroupInfo)
  6. }
  7. if len(apiGroupInfo.GroupMeta.GroupVersion.Version) == 0 {
  8. return fmt.Errorf("cannot register handler with an empty version for %#v", *apiGroupInfo)
  9. }
  10. // 这里实现资源的API暴漏
  11. if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo); err != nil {
  12. return err
  13. }
  14. // setup discovery
  15. // Install the version handler.
  16. // Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
  17. apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
  18. for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
  19. // Check the config to make sure that we elide versions that don't have any resources
  20. if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
  21. continue
  22. }
  23. apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
  24. GroupVersion: groupVersion.String(),
  25. Version: groupVersion.Version,
  26. })
  27. }
  28. preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
  29. GroupVersion: apiGroupInfo.GroupMeta.GroupVersion.String(),
  30. Version: apiGroupInfo.GroupMeta.GroupVersion.Version,
  31. }
  32. apiGroup := metav1.APIGroup{
  33. Name: apiGroupInfo.GroupMeta.GroupVersion.Group,
  34. Versions: apiVersionsForDiscovery,
  35. PreferredVersion: preferredVersionForDiscovery,
  36. }
  37. // 把该API Group信息存储到DiscoveryGroupManager中,让后续暴漏的/apis中使用
  38. s.DiscoveryGroupManager.AddGroup(apiGroup)
  39. // 生成APIGrouphandler
  40. s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup, s.requestContextMapper).WebService())
  41. return nil
  42. }

GenericAPIServer.installAPIResources

installAPIResources是一个用于安装REST存储的私有方法,用来支撑各种api groupversionresource。该函数的代码逻辑非常简单,它循环扫描APIGroupInfo.GroupMeta.GroupVersions成员,生成相应的APIGroupVersion实例,并通过InstallREST方法把该GroupVersion的资源注册服务。

  1. func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
  2. for _, groupVersion := range apiGroupInfo.GroupMeta.GroupVersions {
  3. if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
  4. glog.Warningf("Skipping API %v because it has no resources.", groupVersion)
  5. continue
  6. }
  7. // 这里生成APIGroupVersion实例,是REST API的关键
  8. apiGroupVersion := s.getAPIGroupVersion(apiGroupInfo, groupVersion, apiPrefix)
  9. if apiGroupInfo.OptionsExternalVersion != nil {
  10. apiGroupVersion.OptionsExternalVersion = apiGroupInfo.OptionsExternalVersion
  11. }
  12. if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
  13. return fmt.Errorf("Unable to setup API %v: %v", apiGroupInfo, err)
  14. }
  15. }
  16. return nil
  17. }

getApiGropuVersion的代码如下,它通过组装了所有的资源对应的storage,并生成了APIGroupVersion实例,代码如下:

  1. func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupVersion schema.GroupVersion, apiPrefix string) *genericapi.APIGroupVersion {
  2. storage := make(map[string]rest.Storage)
  3. for k, v := range apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version] {
  4. storage[strings.ToLower(k)] = v
  5. }
  6. version := s.newAPIGroupVersion(apiGroupInfo, groupVersion)
  7. version.Root = apiPrefix
  8. version.Storage = storage
  9. return version
  10. }

APIGroupVersion是区分API版本的关键,以下是代码:

  1. type APIGroupVersion struct {
  2. Storage map[string]rest.Storage
  3. Root string
  4. // GroupVersion is the external group version
  5. GroupVersion schema.GroupVersion
  6. // OptionsExternalVersion controls the Kubernetes APIVersion used for common objects in the apiserver
  7. // schema like api.Status, api.DeleteOptions, and metav1.ListOptions. Other implementors may
  8. // define a version "v1beta1" but want to use the Kubernetes "v1" internal objects. If
  9. // empty, defaults to GroupVersion.
  10. OptionsExternalVersion *schema.GroupVersion
  11. // MetaGroupVersion defaults to "meta.k8s.io/v1" and is the scheme group version used to decode
  12. // common API implementations like ListOptions. Future changes will allow this to vary by group
  13. // version (for when the inevitable meta/v2 group emerges).
  14. MetaGroupVersion *schema.GroupVersion
  15. Mapper meta.RESTMapper
  16. // Serializer is used to determine how to convert responses from API methods into bytes to send over
  17. // the wire.
  18. Serializer runtime.NegotiatedSerializer
  19. ParameterCodec runtime.ParameterCodec
  20. Typer runtime.ObjectTyper
  21. Creater runtime.ObjectCreater
  22. Convertor runtime.ObjectConvertor
  23. Defaulter runtime.ObjectDefaulter
  24. Linker runtime.SelfLinker
  25. UnsafeConvertor runtime.ObjectConvertor
  26. Admit admission.Interface
  27. Context request.RequestContextMapper
  28. MinRequestTimeout time.Duration
  29. // EnableAPIResponseCompression indicates whether API Responses should support compression
  30. // if the client requests it via Accept-Encoding
  31. EnableAPIResponseCompression bool
  32. }
  • storage
    前面在构建APIGroupVersion 实例的过程中,我们看到第一个成员Storage的身影,这个变量我们要分析清楚,它类型为map[string]rest.Storage。所以storage变量是一个Map,Key为Rest API的path ,Value为rest.Storage接口,此接口是一个通用的符合Restful要求的资源存储服务接口,每个服务接口负责处理一类(Kind)Kubernetes API中的数据对象-----资源你数据,只有一个接口方法:New(),New()方法返回该Storage服务所能识别和管理的某种具体的资源数据逇一个空实例。
  1. type Storage interface {
  2. // New returns an empty object that can be used with Create and Update after request data has been put into it.
  3. // This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)
  4. New() runtime.Object
  5. }

在运行期间,Kubernetes API Runtime运行时框架会把New()方法返回的空对象的指针传入Codec.DecodeInto([]byte, runtime.Object)方法中,从而完成HTTP Rest请求中的Byte数组反序列化逻辑。Kubernetes API Server中所有对外提供服务的Restful资源都实现了此接口,这些资源包括pods、bindings、podTemplates、replicationControllers、services等,三个服务都有自己的列表,其中CRD和AA我们已经在前面的讲解中把代码贴出来了。而Master这块其实有两块资源安装:最新的资源以及传统资源模式,后面单独介绍这块。

APIGroupVersion是与rest.Storage Map绑定的,并且绑定了相应版本的Codec、Convertor用于版本转换,这样就很容易理解Kubernetes是怎么区分多版本API的Rest服务的。

  • InstallREST
    在APIGroupVersion的InstallREST(constainer *restful.Container)方法里,用Version变量来构造一个Rest API Path的前缀并赋值给APIInstall的prefix变量,并调用他的Install()方法完成Rest API的转换,代码如下:
  1. func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
  2. prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
  3. installer := &APIInstaller{
  4. group: g,
  5. prefix: prefix,
  6. minRequestTimeout: g.MinRequestTimeout,
  7. enableAPIResponseCompression: g.EnableAPIResponseCompression,
  8. }
  9. apiResources, ws, registrationErrors := installer.Install()
  10. versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}, g.Context)
  11. versionDiscoveryHandler.AddToWebService(ws)
  12. container.Add(ws)
  13. return utilerrors.NewAggregate(registrationErrors)
  14. }
  • APIInstall.Install
    接着,在APIInstaller的Install()方法里用prefix(API版本)前缀生成WebService的相对根路径:
  1. func (a *APIInstaller) newWebService() *restful.WebService {
  2. ws := new(restful.WebService)
  3. ws.Path(a.prefix)
  4. // a.prefix contains "prefix/group/version"
  5. ws.Doc("API at " + a.prefix)
  6. // Backwards compatibility, we accepted objects with empty content-type at V1.
  7. // If we stop using go-restful, we can default empty content-type to application/json on an
  8. // endpoint by endpoint basis
  9. ws.Consumes("*/*")
  10. mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
  11. ws.Produces(append(mediaTypes, streamMediaTypes...)...)
  12. ws.ApiVersion(a.group.GroupVersion.String())
  13. return ws
  14. }
  • 如何实现多版本支持呢?

以前面我们列举过的k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go中的NewRESTStorage方法中,可以看到,根据是否启用了v1beta1与v1版本的API,我们生成了不同的APIGroupInfo,如果同时启用了v1beta1与v1版本的话,最终调用APIGroupVersion的InstallREST方法,从而完成了最终的多版本API的Rest服务装配流程。

APIInstaller

前面我们多次讲到map[string]rest.Storage,从Master(传统模式与现代模式)、CRD、AA三个服务InstallGroupInfo中,都最终生成了这样一个Map实例。

这个Map的Key是Rest API的访问路径,Value却不是之前说好的restful.Route。所以必须存在一个“转换适配”的方法来实现上述转换!转化你的方法在pkg/apiserver/api_install.go的下属方法里:
func (a *APIInstaller) registerResoruceHandlers(path string, storage rest.Storage, ws *restful.WebService, proxyHandler http.Handler)

上述方法把一个path对应的rest.Storage转换成一系列的restful.Route并添加到指针restful.WebService中。这个函数的代码之所以很长,是因为有各种情况要考虑,比如pods/portforward这种路径要处理child,还要判断美中Storage资源类型锁支持的操作类型:比如是否支持create、delete、update及是否支持list、watch、pathcer等,对各种情况都考虑以后,这个函数的代码量已经超过500行!于是在外面封装了一个简单函数:func (a *APIInstlal)Install,内部循环调用registerResourceHandlers,返回最终的restful.WebService对象,次方法的主要代码如下:

  1. func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
  2. var apiResources []metav1.APIResource
  3. var errors []error
  4. ws := a.newWebService()
  5. proxyHandler := (&handlers.ProxyHandler{
  6. Prefix: a.prefix + "/proxy/",
  7. Storage: a.group.Storage,
  8. Serializer: a.group.Serializer,
  9. Mapper: a.group.Context,
  10. })
  11. // Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
  12. paths := make([]string, len(a.group.Storage))
  13. var i int = 0
  14. for path := range a.group.Storage {
  15. paths[i] = path
  16. i++
  17. }
  18. sort.Strings(paths)
  19. for _, path := range paths {
  20. apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws, proxyHandler)
  21. if err != nil {
  22. errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
  23. }
  24. if apiResource != nil {
  25. apiResources = append(apiResources, *apiResource)
  26. }
  27. }
  28. return apiResources, ws, errors
  29. }

Install()方法循环调用了registerResourceHandlers函数,该函数实现了rest.Storage到restful.Route的转换,由于该函数代码比较长,这里只列举相关的片段:

  1. creater, isCreater := storage.(rest.Creater)
  2. namedCreater, isNamedCreater := storage.(rest.NamedCreater)
  3. lister, isLister := storage.(rest.Lister)
  4. getter, isGetter := storage.(rest.Getter)
  5. getterWithOptions, isGetterWithOptions := storage.(rest.GetterWithOptions)
  6. deleter, isDeleter := storage.(rest.Deleter)
  7. gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
  8. collectionDeleter, isCollectionDeleter := storage.(rest.CollectionDeleter)
  9. updater, isUpdater := storage.(rest.Updater)
  10. patcher, isPatcher := storage.(rest.Patcher)
  11. watcher, isWatcher := storage.(rest.Watcher)
  12. _, isRedirector := storage.(rest.Redirector)
  13. connecter, isConnecter := storage.(rest.Connecter)
  14. storageMeta, isMetadata := storage.(rest.StorageMetadata)

前面我们提到rest.Storage接口只有一个New方法,一般的资源对象存储,出了实现rest.Storage接口之外,还实现多种REST操作接口,如上所示,具体各种资源数据对象的存储对象见ETCD存储分析。这段代码对storage对象进行判断,以确定并标记它锁满足的API Rest接口类型,而接下来的这段代码在此基础上确定此接口所包含的actions,后者则对应到某种HTTP请求方法(GET/POST/PUT/DELETE)或者HTTP PROXY、WATCH、CONNECT等动作;

  1. actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
  2. actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer, false}, isCreater)
  3. actions = appendIf(actions, action{"DELETECOLLECTION", resourcePath, resourceParams, namer, false}, isCollectionDeleter)
  4. // DEPRECATED
  5. actions = appendIf(actions, action{"WATCHLIST", "watch/" + resourcePath, resourceParams, namer, false}, allowWatchList)
  6. actions = appendIf(actions, action{"GET", itemPath, nameParams, namer, false}, isGetter)
  7. if getSubpath {
  8. actions = appendIf(actions, action{"GET", itemPath + "/{path:*}", proxyParams, namer, false}, isGetter)
  9. }
  10. actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)
  11. actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)
  12. actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isDeleter)
  13. actions = appendIf(actions, action{"WATCH", "watch/" + itemPath, nameParams, namer, false}, isWatcher)
  14. // We add "proxy" subresource to remove the need for the generic top level prefix proxy.
  15. // The generic top level prefix proxy is deprecated in v1.2, and will be removed in 1.3, or 1.4 at the latest.
  16. // TODO: DEPRECATED in v1.2.
  17. actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath + "/{path:*}", proxyParams, namer, false}, isRedirector)
  18. // TODO: DEPRECATED in v1.2.
  19. actions = appendIf(actions, action{"PROXY", "proxy/" + itemPath, nameParams, namer, false}, isRedirector)
  20. actions = appendIf(actions, action{"CONNECT", itemPath, nameParams, namer, false}, isConnecter)
  21. actions = appendIf(actions, action{"CONNECT", itemPath + "/{path:*}", proxyParams, namer, false}, isConnecter && connectSubpath)

我们注意到rest.Redirector类型的storage被当作PROXY进行处理,由apiserver.ProxyHandler进行拦截,并调用rest.Redirector的ResourceLocation方法获取资源的处理路径(可能包含一个非空的http.RoundTripper),用于处理执行Redirector返回的URL请求)。Kubernetes API Server中PROXY请求存在的意义在于透明地访问某个其他节点(比如某个Minion)上的API。

最后,我们来分析下registerResourcesHandles中完成从rest.Storage到restful.Route映射的最后一段关键代码。下面是rest.Getter接口的Storage映射代码:

  1. case "GET": // Get a resource.
  2. var handler restful.RouteFunction
  3. if isGetterWithOptions {
  4. handler = restfulGetResourceWithOptions(getterWithOptions, reqScope, hasSubresource)
  5. } else {
  6. handler = restfulGetResource(getter, exporter, reqScope)
  7. }
  8. if needOverride {
  9. // need change the reported verb
  10. handler = metrics.InstrumentRouteFunc(verbOverrider.OverrideMetricsVerb(action.Verb), resource, subresource, requestScope, handler)
  11. } else {
  12. handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, handler)
  13. }
  14. if a.enableAPIResponseCompression {
  15. handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
  16. }
  17. doc := "read the specified " + kind
  18. if hasSubresource {
  19. doc = "read " + subresource + " of the specified " + kind
  20. }
  21. route := ws.GET(action.Path).To(handler).
  22. Doc(doc).
  23. Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
  24. Operation("read"+namespaced+kind+strings.Title(subresource)+operationSuffix).
  25. Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
  26. Returns(http.StatusOK, "OK", producedObject).
  27. Writes(producedObject)
  28. if isGetterWithOptions {
  29. if err := addObjectParams(ws, route, versionedGetOptions); err != nil {
  30. return nil, err
  31. }
  32. }
  33. if isExporter {
  34. if err := addObjectParams(ws, route, versionedExportOptions); err != nil {
  35. return nil, err
  36. }
  37. }
  38. addParams(route, action.Params)
  39. routes = append(routes, route)

上述代码首先通过函数restfulGetResourceWithOptions或者restfulGetResource创建了一个restful.RouteFunction,然后生成一个restful.route对象,最后注册到 restful.WebService中,从而完成了rest.Storage到Rest服务的“最后一公里”通车。restfulGetResource函数的定义如下:

  1. func restfulGetResource(r rest.Getter, e rest.Exporter, scope handlers.RequestScope) restful.RouteFunction {
  2. return func(req *restful.Request, res *restful.Response) {
  3. handlers.GetResource(r, e, scope)(res.ResponseWriter, req.Request)
  4. }
  5. }

handlers.GetResource的定义如下:

  1. func GetResource(r rest.Getter, e rest.Exporter, scope RequestScope) http.HandlerFunc {
  2. return getResourceHandler(scope,
  3. func(ctx request.Context, name string, req *http.Request, trace *utiltrace.Trace) (runtime.Object, error) {
  4. // check for export
  5. options := metav1.GetOptions{}
  6. if values := req.URL.Query(); len(values) > 0 {
  7. exports := metav1.ExportOptions{}
  8. if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &exports); err != nil {
  9. err = errors.NewBadRequest(err.Error())
  10. return nil, err
  11. }
  12. if exports.Export {
  13. if e == nil {
  14. return nil, errors.NewBadRequest(fmt.Sprintf("export of %q is not supported", scope.Resource.Resource))
  15. }
  16. return e.Export(ctx, name, exports)
  17. }
  18. if err := metainternalversion.ParameterCodec.DecodeParameters(values, scope.MetaGroupVersion, &options); err != nil {
  19. err = errors.NewBadRequest(err.Error())
  20. return nil, err
  21. }
  22. }
  23. if trace != nil {
  24. trace.Step("About to Get from storage")
  25. }
  26. // 这里调用了资源对象存储的Get方法,从而返回具体的资源对象
  27. return r.Get(ctx, name, &options)
  28. })
  29. }

最终,我们看到了API的服务最终通过调用r.Get(ctx, name, &options)方法,从而得以返回某个资源对象。

在上面的查询操作中,没有权限控制,但是查看一下createHandler方法,就能看到权限控制的身影,如下所示的代码片段:

  1. admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, userInfo)
  2. if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
  3. err = mutatingAdmission.Admit(admissionAttributes)
  4. if err != nil {
  5. scope.err(err, w, req)
  6. return
  7. }
  8. }

而对于资源的Create、Update、Delete、Connect、Patch等操作都有类似的权限控制,从Admit的参数admission.Attributes的属性来看,第三方系统可以开发细粒度的权限控制插件,针对任意资源的任意属性进行细粒度的权限控制,因为资源对象本身都传递到参数中了。

  • 资源数据对象的序列化与版本化

这里也列举了createHandler中的代码片段,如下所示:

  1. gv := scope.Kind.GroupVersion()
  2. // 得到合适的SerializerInfo
  3. s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
  4. if err != nil {
  5. scope.err(err, w, req)
  6. return
  7. }
  8. // 找到合适的decoder
  9. decoder := scope.Serializer.DecoderToVersion(s.Serializer, schema.GroupVersion{Group: gv.Group, Version: runtime.APIVersionInternal})
  10. body, err := readBody(req)
  11. if err != nil {
  12. scope.err(err, w, req)
  13. return
  14. }
  15. defaultGVK := scope.Kind
  16. original := r.New()
  17. trace.Step("About to convert to expected version")
  18. // 采用decoder解码
  19. obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
  20. if err != nil {
  21. err = transformDecodeError(typer, err, original, gvk, body)
  22. scope.err(err, w, req)
  23. return
  24. }



作者:何约什
链接:https://www.jianshu.com/p/3d744f4a1317
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/901325
推荐阅读
相关标签
  

闽ICP备14008679号