当前位置:   article > 正文

【kubernetes/k8s源码分析】coredns 源码分析之三 kubernetes 插件

【kubernetes/k8s源码分析】coredns 源码分析之三 kubernetes 插件

 

 

    插件描述: https://coredns.io/plugins/

 

1. kubernetes 插件

kubernetes [ZONES...] {
    endpoint URL
    tls CERT KEY CACERT
    kubeconfig KUBECONFIG CONTEXT
    namespaces NAMESPACE...
    labels EXPRESSION
    pods POD-MODE
    endpoint_pod_names
    ttl TTL
    noendpoints
    transfer to ADDRESS...
    fallthrough [ZONES...]
    ignore empty_service
}

    1.1 init 函数   

      一样的套路,调用 caddy.RegisterPlugin 函数注册 kubernetes 插件

  1. func init() {
  2. // Kubernetes plugin uses the kubernetes library, which now uses klog, we must set and parse this flag
  3. // so we don't log to the filesystem, which can fill up and crash CoreDNS indirectly by calling os.Exit().
  4. // We also set: os.Stderr = os.Stdout in the setup function below so we output to standard out; as we do for
  5. // all CoreDNS logging. We can't do *that* in the init function, because we, when starting, also barf some
  6. // things to stderr.
  7. klogFlags := flag.NewFlagSet("klog", flag.ExitOnError)
  8. klog.InitFlags(klogFlags)
  9. logtostderr := klogFlags.Lookup("logtostderr")
  10. logtostderr.Value.Set("true")
  11. caddy.RegisterPlugin("kubernetes", caddy.Plugin{
  12. ServerType: "dns",
  13. Action: setup,
  14. })
  15. }

    1.2 setup 函数

     kubernetesParse 函数中的 ParseStanza 函数

  1. func setup(c *caddy.Controller) error {
  2. // See comment in the init function.
  3. os.Stderr = os.Stdout
  4. k, err := kubernetesParse(c)
  5. if err != nil {
  6. return plugin.Error("kubernetes", err)
  7. }

     1.2.1 kubernetes 结构体

     interfaceAddrsFunc 设置为 localPodIP

     autoPathSearch 设置为 searchFromResolvConf,从 /etc/resov.conf 配置中得到 search,(search default.svc.cluster.local. svc.cluster.local. cluster.local.)

  1. // Kubernetes implements a plugin that connects to a Kubernetes cluster.
  2. type Kubernetes struct {
  3. Next plugin.Handler
  4. Zones []string
  5. Upstream *upstream.Upstream
  6. APIServerList []string
  7. APICertAuth string
  8. APIClientCert string
  9. APIClientKey string
  10. ClientConfig clientcmd.ClientConfig
  11. APIConn dnsController
  12. Namespaces map[string]struct{}
  13. podMode string
  14. endpointNameMode bool
  15. Fall fall.F
  16. ttl uint32
  17. opts dnsControlOpts
  18. primaryZoneIndex int
  19. interfaceAddrsFunc func() net.IP
  20. autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath.
  21. TransferTo []string
  22. }

     1.2.2 对于每一个配置块,读取配置

     endpoint_pod_names:在 A 记录中使用 pod 名字,例如 endpoint-name.my-service.namespace.svc.cluster.local. in A 1.2.3.4

     pods:POD-MODE (disabled / insecure / verified),例如 1-2-3-4.ns.pod.cluster.local. in A 1.2.3.4

     namespaces: NAMESPACE [NAMESPACE…] ,暴露的 k8s namespaces 列表,如果省略则暴露所有 namespaces,这个可以用于 namespace 中的 DNS 隔离

    kubeconfig: KUBECONFIG CONTEXT,连接 k8s 的证书配置文件

    namespace_labels:EXPRESSION,用于匹配 namespace label,可以用于一组 namespace

    fallthrough:[ZONES…],如果指定 in-addr.arpa ip6.arpa,只有这些 zone 的查询才会 fallthrough

kubernetes [ZONES...] {
    endpoint URL
    tls CERT KEY CACERT
    kubeconfig KUBECONFIG CONTEXT
    namespaces NAMESPACE...
    labels EXPRESSION
    pods POD-MODE
    endpoint_pod_names
    ttl TTL
    noendpoints
    transfer to ADDRESS...
    fallthrough [ZONES...]
    ignore empty_service
}

  1. for c.NextBlock() {
  2. switch c.Val() {
  3. case "endpoint_pod_names":
  4. args := c.RemainingArgs()
  5. if len(args) > 0 {
  6. return nil, c.ArgErr()
  7. }
  8. k8s.endpointNameMode = true
  9. continue
  10. case "pods":
  11. args := c.RemainingArgs()
  12. if len(args) == 1 {
  13. switch args[0] {
  14. case podModeDisabled, podModeInsecure, podModeVerified:
  15. k8s.podMode = args[0]
  16. default:
  17. return nil, fmt.Errorf("wrong value for pods: %s, must be one of: disabled, verified, insecure", args[0])
  18. }
  19. continue
  20. }
  21. return nil, c.ArgErr()

     注意,namespaces 和 namespace_labels 不能同时设置 

  1. if len(k8s.Namespaces) != 0 && k8s.opts.namespaceLabelSelector != nil {
  2. return nil, c.Errf("namespaces and namespace_labels cannot both be set")
  3. }

     1.2.3 InitKubeCache 

      建立 k8s 客户端连接,namespace_labels 初始化,调用 newdnsController 实例化 dnsControl

  1. // InitKubeCache initializes a new Kubernetes cache.
  2. func (k *Kubernetes) InitKubeCache() (err error) {
  3. if k.opts.namespaceLabelSelector != nil {
  4. var selector labels.Selector
  5. selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector)
  6. if err != nil {
  7. return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err)
  8. }
  9. k.opts.namespaceSelector = selector
  10. }

     1.2.3.1 newdnsController 函数实例化 dnsControl

      包括 client,label 选择,name label 选择等

  1. // newDNSController creates a controller for CoreDNS.
  2. func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl {
  3. dns := dnsControl{
  4. client: kubeClient,
  5. selector: opts.selector,
  6. namespaceSelector: opts.namespaceSelector,
  7. stopCh: make(chan struct{}),
  8. zones: opts.zones,
  9. endpointNameMode: opts.endpointNameMode,
  10. }

     1.2.3.2 sevice pod 设置 informer 机制

      注意其实个人感觉的建议,如果设置了 namspace_labels,已经进行了隔离,没有必要去 list watcher 所有的 service,只关注相关的 namespace 下即可,可以减少缓存的数量

  1. dns.svcLister, dns.svcController = object.NewIndexerInformer(
  2. &cache.ListWatch{
  3. ListFunc: serviceListFunc(dns.client, api.NamespaceAll, dns.selector),
  4. WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector),
  5. },
  6. &api.Service{},
  7. opts.resyncPeriod,
  8. cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
  9. cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
  10. object.ToService,
  11. )
  12. if opts.initPodCache {
  13. dns.podLister, dns.podController = object.NewIndexerInformer(
  14. &cache.ListWatch{
  15. ListFunc: podListFunc(dns.client, api.NamespaceAll, dns.selector),
  16. WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector),
  17. },
  18. &api.Pod{},
  19. opts.resyncPeriod,
  20. cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
  21. cache.Indexers{podIPIndex: podIPIndexFunc},
  22. object.ToPod,
  23. )
  24. }

     1.2.3.3 endpoint namespace 的 informer 机制

      注意 namespace 只关注 namespace labe 的

  1. if opts.initEndpointsCache {
  2. dns.epLister, dns.epController = object.NewIndexerInformer(
  3. &cache.ListWatch{
  4. ListFunc: endpointsListFunc(dns.client, api.NamespaceAll, dns.selector),
  5. WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
  6. },
  7. &api.Endpoints{},
  8. opts.resyncPeriod,
  9. cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
  10. cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
  11. object.ToEndpoints)
  12. }
  13. dns.nsLister, dns.nsController = cache.NewInformer(
  14. &cache.ListWatch{
  15. ListFunc: namespaceListFunc(dns.client, dns.namespaceSelector),
  16. WatchFunc: namespaceWatchFunc(dns.client, dns.namespaceSelector),
  17. },
  18. &api.Namespace{},
  19. opts.resyncPeriod,
  20. cache.ResourceEventHandlerFuncs{})

     1.2.4 RegisterKubeCache 向 caddy 服务框架注册 OnStartup 和 OnShutdown 函数

  1. // RegisterKubeCache registers KubeCache start and stop functions with Caddy
  2. func (k *Kubernetes) RegisterKubeCache(c *caddy.Controller) {
  3. c.OnStartup(func() error {
  4. go k.APIConn.Run()
  5. timeout := time.After(5 * time.Second)
  6. ticker := time.NewTicker(100 * time.Millisecond)
  7. for {
  8. select {
  9. case <-ticker.C:
  10. if k.APIConn.HasSynced() {
  11. return nil
  12. }
  13. case <-timeout:
  14. return nil
  15. }
  16. }
  17. })
  18. c.OnShutdown(func() error {
  19. return k.APIConn.Stop()
  20. })
  21. }

     1.2.4.1 Run 函数运行 svc 这些 controller 

  1. // Run starts the controller.
  2. func (dns *dnsControl) Run() {
  3. go dns.svcController.Run(dns.stopCh)
  4. if dns.epController != nil {
  5. go dns.epController.Run(dns.stopCh)
  6. }
  7. if dns.podController != nil {
  8. go dns.podController.Run(dns.stopCh)
  9. }
  10. go dns.nsController.Run(dns.stopCh)
  11. <-dns.stopCh
  12. }

     1.2.4.2 HasSynced 定期同步数据

  1. // HasSynced calls on all controllers.
  2. func (dns *dnsControl) HasSynced() bool {
  3. a := dns.svcController.HasSynced()
  4. b := true
  5. if dns.epController != nil {
  6. b = dns.epController.HasSynced()
  7. }
  8. c := true
  9. if dns.podController != nil {
  10. c = dns.podController.HasSynced()
  11. }
  12. d := dns.nsController.HasSynced()
  13. return a && b && c && d
  14. }

     1.2.5 调用 AddPlugin 注册 kubernetes 插件

  1. dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
  2. k.Next = next
  3. return k
  4. })

 

2. ServiceBackend 接口

  1. // ServiceBackend defines a (dynamic) backend that returns a slice of service definitions.
  2. type ServiceBackend interface {
  3. // Services communicates with the backend to retrieve the service definitions. Exact indicates
  4. // on exact match should be returned.
  5. Services(ctx context.Context, state request.Request, exact bool, opt Options) ([]msg.Service, error)
  6. // Reverse communicates with the backend to retrieve service definition based on a IP address
  7. // instead of a name. I.e. a reverse DNS lookup.
  8. Reverse(ctx context.Context, state request.Request, exact bool, opt Options) ([]msg.Service, error)
  9. // Lookup is used to find records else where.
  10. Lookup(ctx context.Context, state request.Request, name string, typ uint16) (*dns.Msg, error)
  11. // Returns _all_ services that matches a certain name.
  12. // Note: it does not implement a specific service.
  13. Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error)
  14. // IsNameError return true if err indicated a record not found condition
  15. IsNameError(err error) bool
  16. Transferer
  17. }

 

3. kubernetes 的 ServeDNS 方法

  1. // ServeDNS implements the plugin.Handler interface.
  2. func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
  3. state := request.Request{W: w, Req: r}
  4. qname := state.QName()
  5. zone := plugin.Zones(k.Zones).Matches(qname)
  6. if zone == "" {
  7. return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r)
  8. }
  9. zone = qname[len(qname)-len(zone):] // maintain case of original query
  10. state.Zone = zone

    3.1 A 记录

     A记录 A(Address)记录是用来指定主机名(或域名)对应的IP地址记录,checkForApex 函数中调用 Services 方法找到对应的 svc

  1. // A returns A records from Backend or an error.
  2. func A(ctx context.Context, b ServiceBackend, zone string, state request.Request, previousRecords []dns.RR, opt Options) (records []dns.RR, err error) {
  3. services, err := checkForApex(ctx, b, zone, state, opt)
  4. if err != nil {
  5. return nil, err
  6. }

    3.2 对所有符合的 service 遍历

  1. for _, serv := range services {
  2. what, ip := serv.HostType()
  3. switch what {
  4. case dns.TypeCNAME:
  5. if Name(state.Name()).Matches(dns.Fqdn(serv.Host)) {
  6. // x CNAME x is a direct loop, don't add those
  7. continue
  8. }
  9. newRecord := serv.NewCNAME(state.QName(), serv.Host)
  10. if len(previousRecords) > 7 {
  11. // don't add it, and just continue
  12. continue
  13. }
  14. if dnsutil.DuplicateCNAME(newRecord, previousRecords) {
  15. continue
  16. }
  17. if dns.IsSubDomain(zone, dns.Fqdn(serv.Host)) {
  18. state1 := state.NewWithQuestion(serv.Host, state.QType())
  19. state1.Zone = zone
  20. nextRecords, err := A(ctx, b, zone, state1, append(previousRecords, newRecord), opt)
  21. if err == nil {
  22. // Not only have we found something we should add the CNAME and the IP addresses.
  23. if len(nextRecords) > 0 {
  24. records = append(records, newRecord)
  25. records = append(records, nextRecords...)
  26. }
  27. }
  28. continue
  29. }
  30. // This means we can not complete the CNAME, try to look else where.
  31. target := newRecord.Target
  32. // Lookup
  33. m1, e1 := b.Lookup(ctx, state, target, state.QType())
  34. if e1 != nil {
  35. continue
  36. }
  37. // Len(m1.Answer) > 0 here is well?
  38. records = append(records, newRecord)
  39. records = append(records, m1.Answer...)
  40. continue
  41. case dns.TypeA:
  42. if _, ok := dup[serv.Host]; !ok {
  43. dup[serv.Host] = struct{}{}
  44. records = append(records, serv.NewA(state.QName(), ip))
  45. }
  46. case dns.TypeAAAA:
  47. // nada
  48. }
  49. }

 

总结:

    pod 内域名解析,如果 dnsPolicy: ClusterFirst,则会以此调用 XXX.default.svc.cluster.local.  XXX.svc.cluster.local.  XXX.cluster.local.

    namespaces 和 namespace_labes 不能同时设置,个人意见,对于 namespaces 可以只 list watch 设置的 namsespace,既然只关注相应的 namespace,可以 list watch 该namespace 下的 service 即可,不需要 list watch 所有

 

 A记录: A(Address)记录是用来指定主机名(或域名)对应的IP地址记录

NS记录: NS(Name Server)记录是域名服务器记录,用来指定该域名由哪个DNS服务器来进行解析

PTR:pointer 的简写,用于将一个IP地址映射到对应的域名,也可以看成是A记录的反向,IP地址的反向解析

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号