当前位置:   article > 正文

k8s源码分析——kubelet 创建 pod 的流程_kubelet pod

kubelet pod

上篇文章介绍了 k8s源码分析——kubelet启动分析,本篇文章主要介绍 kubelet 创建 pod 的流程。

kubernetes 版本: v1.13

kubelet 工作原理

kubelet 的工作核心就是在围绕着不同的生产者生产出来的不同的有关 pod 的消息来调用相应的消费者(不同的子模块)完成不同的行为(创建和删除 pod 等),即图中的控制循环(SyncLoop),通过不同的事件驱动这个控制循环运行。

本文仅分析新建 pod 的流程,当一个 pod 完成调度,与一个 node 绑定起来之后,这个 pod 就会触发 kubelet 在循环控制里注册的 handler,上图中的 HandlePods 部分。此时,通过检查 pod 在 kubelet 内存中的状态,kubelet 就能判断出这是一个新调度过来的 pod,从而触发 Handler 里的 ADD 事件对应的逻辑处理。然后 kubelet 会为这个 pod 生成对应的 podStatus,接着检查 pod 所声明的 volume 是不是准备好了,然后调用下层的容器运行时。如果是 update 事件的话,kubelet 就会根据 pod 对象具体的变更情况,调用下层的容器运行时进行容器的重建。

kubelet 创建 pod 的流程

kubelet 创建 pod 的流程

1、kubelet 的控制循环(syncLoop)

syncLoop 中首先定义了一个 syncTicker 和 housekeepingTicker,即使没有需要更新的 pod 配置,kubelet 也会定时去做同步和清理 pod 的工作。然后在 for 循环中一直调用 syncLoopIteration,如果在每次循环过程中出现比较严重的错误,kubelet 会记录到 runtimeState 中,遇到错误就等待 5 秒中继续循环。

  1. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  2. glog.Info("Starting kubelet main sync loop.")
  3. // syncTicker 每秒检测一次是否有需要同步的 pod workers
  4. syncTicker := time.NewTicker(time.Second)
  5. defer syncTicker.Stop()
  6. // 每两秒检测一次是否有需要清理的 pod
  7. housekeepingTicker := time.NewTicker(housekeepingPeriod)
  8. defer housekeepingTicker.Stop()
  9. // pod 的生命周期变化
  10. plegCh := kl.pleg.Watch()
  11. const (
  12. base = 100 * time.Millisecond
  13. max = 5 * time.Second
  14. factor = 2
  15. )
  16. duration := base
  17. for {
  18. if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
  19. time.Sleep(duration)
  20. duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
  21. continue
  22. }
  23. ...
  24. kl.syncLoopMonitor.Store(kl.clock.Now())
  25. // 第二个参数为 SyncHandler 类型,SyncHandler 是一个 interface,
  26. // 在该文件开头处定义
  27. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
  28. break
  29. }
  30. kl.syncLoopMonitor.Store(kl.clock.Now())
  31. }
  32. }

2、监听 pod 变化(syncLoopIteration)

syncLoopIteration 这个方法就会对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。它会从以下管道中获取消息:

  • configCh:该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作。
  • syncCh:定时器管道,每隔一秒去同步最新保存的 pod 状态
  • houseKeepingCh:housekeeping 事件的管道,做 pod 清理工作
  • plegCh:该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态,如果状态发生变化,则这个 channel 产生事件。
  • livenessManager.Updates():健康检查发现某个 pod 不可用,kubelet 将根据 Pod 的restartPolicy 自动执行正确的操作
  1. func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
  2. syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
  3. select {
  4. case u, open := <-configCh:
  5. if !open {
  6. glog.Errorf("Update channel is closed. Exiting the sync loop.")
  7. return false
  8. }
  9. switch u.Op {
  10. case kubetypes.ADD:
  11. ...
  12. case kubetypes.UPDATE:
  13. ...
  14. case kubetypes.REMOVE:
  15. ...
  16. case kubetypes.RECONCILE:
  17. ...
  18. case kubetypes.DELETE:
  19. ...
  20. case kubetypes.RESTORE:
  21. ...
  22. case kubetypes.SET:
  23. ...
  24. }
  25. ...
  26. case e := <-plegCh:
  27. ...
  28. case <-syncCh:
  29. ...
  30. case update := <-kl.livenessManager.Updates():
  31. ...
  32. case <-housekeepingCh:
  33. ...
  34. }
  35. return true
  36. }

3、处理新增 pod(HandlePodAddtions)

对于事件中的每个 pod,执行以下操作:

  • 1、把所有的 pod 按照创建日期进行排序,保证最先创建的 pod 会最先被处理
  • 2、把它加入到 podManager 中,podManager 子模块负责管理这台机器上的 pod 的信息,pod 和 mirrorPod 之间的对应关系等等。所有被管理的 pod 都要出现在里面,如果 podManager 中找不到某个 pod,就认为这个 pod 被删除了
  • 3、如果是 mirror pod 调用其单独的方法
  • 4、验证 pod 是否能在该节点运行,如果不可以直接拒绝
  • 5、通过 dispatchWork 把创建 pod 的工作下发给 podWorkers 子模块做异步处理
  • 6、在 probeManager 中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测
  1. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
  2. start := kl.clock.Now()
  3. // 对所有 pod 按照日期排序,保证最先创建的 pod 优先被处理
  4. sort.Sort(sliceutils.PodsByCreationTime(pods))
  5. for _, pod := range pods {
  6. if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
  7. kl.dnsConfigurer.CheckLimitsForResolvConf()
  8. }
  9. existingPods := kl.podManager.GetPods()
  10. // 把 pod 加入到 podManager 中
  11. kl.podManager.AddPod(pod)
  12. // 判断是否是 mirror pod(即 static pod)
  13. if kubepod.IsMirrorPod(pod) {
  14. kl.handleMirrorPod(pod, start)
  15. continue
  16. }
  17. if !kl.podIsTerminated(pod) {
  18. activePods := kl.filterOutTerminatedPods(existingPods)
  19. // 通过 canAdmitPod 方法校验Pod能否在该计算节点创建(如:磁盘空间)
  20. // Check if we can admit the pod; if not, reject it.
  21. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
  22. kl.rejectPod(pod, reason, message)
  23. continue
  24. }
  25. }
  26. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  27. // 通过 dispatchWork 分发 pod 做异步处理,dispatchWork 主要工作就是把接收到的参数封装成 UpdatePodOptions,调用 UpdatePod 方法.
  28. kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
  29. // 在 probeManager 中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测
  30. kl.probeManager.AddPod(pod)
  31. }
  32. }

static pod 是由 kubelet 直接管理的,k8s apiserver 并不会感知到 static pod 的存在,当然也不会和任何一个 rs 关联上,完全是由 kubelet 进程来监管,并在它异常时负责重启。Kubelet 会通过 apiserver 为每一个 static pod 创建一个对应的 mirror pod,如此以来就可以可以通过 kubectl 命令查看对应的 pod,并且可以通过 kubectl logs 命令直接查看到static pod 的日志信息。

4、下发任务(dispatchWork)

dispatchWorker 的主要作用是把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers。

  1. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
  2. if kl.podIsTerminated(pod) {
  3. if pod.DeletionTimestamp != nil {
  4. kl.statusManager.TerminatePod(pod)
  5. }
  6. return
  7. }
  8. // 落实在 podWorkers 中
  9. kl.podWorkers.UpdatePod(&UpdatePodOptions{
  10. Pod: pod,
  11. MirrorPod: mirrorPod,
  12. UpdateType: syncType,
  13. OnCompleteFunc: func(err error) {
  14. if err != nil {
  15. metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
  16. }
  17. },
  18. })
  19. if syncType == kubetypes.SyncPodCreate {
  20. metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
  21. }
  22. }

5、更新事件的 channel(UpdatePod)

podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。

  1. func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
  2. pod := options.Pod
  3. uid := pod.UID
  4. var podUpdates chan UpdatePodOptions
  5. var exists bool
  6. p.podLock.Lock()
  7. defer p.podLock.Unlock()
  8. // 如果当前 pod 还没有启动过 goroutine ,则启动 goroutine,并且创建 channel
  9. if podUpdates, exists = p.podUpdates[uid]; !exists {
  10. // 创建 channel
  11. podUpdates = make(chan UpdatePodOptions, 1)
  12. p.podUpdates[uid] = podUpdates
  13. // 启动 goroutine
  14. go func() {
  15. defer runtime.HandleCrash()
  16. p.managePodLoop(podUpdates)
  17. }()
  18. }
  19. // 下发更新事件
  20. if !p.isWorking[pod.UID] {
  21. p.isWorking[pod.UID] = true
  22. podUpdates <- *options
  23. } else {
  24. update, found := p.lastUndeliveredWorkUpdate[pod.UID]
  25. if !found || update.UpdateType != kubetypes.SyncPodKill {
  26. p.lastUndeliveredWorkUpdate[pod.UID] = *options
  27. }
  28. }
  29. }

6、调用 syncPodFn 方法同步 pod(managePodLoop)

managePodLoop 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是kubelet.SyncPod。在完成这次 sync 动作之后,会调用 wrapUp 函数,这个函数将会做几件事情:

  • 将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync
  • 将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。
  1. func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
  2. var lastSyncTime time.Time
  3. for update := range podUpdates {
  4. err := func() error {
  5. podUID := update.Pod.UID
  6. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
  7. if err != nil {
  8. ...
  9. }
  10. err = p.syncPodFn(syncPodOptions{
  11. mirrorPod: update.MirrorPod,
  12. pod: update.Pod,
  13. podStatus: status,
  14. killPodOptions: update.KillPodOptions,
  15. updateType: update.UpdateType,
  16. })
  17. lastSyncTime = time.Now()
  18. return err
  19. }()
  20. if update.OnCompleteFunc != nil {
  21. update.OnCompleteFunc(err)
  22. }
  23. if err != nil {
  24. ...
  25. }
  26. p.wrapUp(update.Pod.UID, err)
  27. }
  28. }

7、完成创建容器前的准备工作(SyncPod)

在这个方法中,主要完成以下几件事情:

  • 如果是删除 pod,立即执行并返回
  • 同步 podStatus 到 kubelet.statusManager
  • 检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息
  • 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup
  • 如果是 static Pod,就创建或者更新对应的 mirrorPod
  • 创建 pod 的数据目录,存放 volume 和 plugin 信息,如果定义了 pv,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情),如果有 image secrets,去 apiserver 获取对应的 secrets 数据
  • 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好。
  • 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑

这里所有的事情都和具体的容器没有关系,可以看到该方法是创建 pod 实体(即容器)之前需要完成的准备工作。

  1. func (kl *Kubelet) syncPod(o syncPodOptions) error {
  2. // pull out the required options
  3. pod := o.pod
  4. mirrorPod := o.mirrorPod
  5. podStatus := o.podStatus
  6. updateType := o.updateType
  7. // 是否为 删除 pod
  8. if updateType == kubetypes.SyncPodKill {
  9. ...
  10. }
  11. ...
  12. // 检查 pod 是否能运行在本节点
  13. runnable := kl.canRunPod(pod)
  14. if !runnable.Admit {
  15. ...
  16. }
  17. // 更新 pod 状态
  18. kl.statusManager.SetPodStatus(pod, apiPodStatus)
  19. // 如果 pod 非 running 状态则直接 kill 掉
  20. if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed {
  21. ...
  22. }
  23. // 加载网络插件
  24. if rs := kl.runtimeState.networkErrors(); len(rs) != 0 && !kubecontainer.IsHostNetworkPod(pod) {
  25. ...
  26. }
  27. pcm := kl.containerManager.NewPodContainerManager()
  28. if !kl.podIsTerminated(pod) {
  29. ...
  30. // 创建并更新 pod 的 cgroups
  31. if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
  32. if !pcm.Exists(pod) {
  33. ...
  34. }
  35. }
  36. }
  37. // 为 static pod 创建对应的 mirror pod
  38. if kubepod.IsStaticPod(pod) {
  39. ...
  40. }
  41. // 创建数据目录
  42. if err := kl.makePodDataDirs(pod); err != nil {
  43. ...
  44. }
  45. // 挂载 volume
  46. if !kl.podIsTerminated(pod) {
  47. if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
  48. ...
  49. }
  50. }
  51. // 获取 secret 信息
  52. pullSecrets := kl.getPullSecretsForPod(pod)
  53. // 调用 containerRuntime 的 SyncPod 方法开始创建容器
  54. result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
  55. kl.reasonCache.Update(pod.UID, result)
  56. if err := result.Error(); err != nil {
  57. ...
  58. }
  59. return nil
  60. }

8、创建容器

containerRuntime(pkg/kubelet/kuberuntime)子模块的 SyncPod 函数才是真正完成 pod 内容器实体的创建。
syncPod 主要执行以下几个操作:

  • 1、计算 sandbox 和 container 是否发生变化
  • 2、创建 sandbox 容器
  • 3、启动 init 容器
  • 4、启动业务容器

initContainers 可以有多个,多个 container 严格按照顺序启动,只有当前一个 container 退出了以后,才开始启动下一个 container。

  1. func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
  2. // 1、计算 sandbox 和 container 是否发生变化
  3. podContainerChanges := m.computePodActions(pod, podStatus)
  4. if podContainerChanges.CreateSandbox {
  5. ref, err := ref.GetReference(legacyscheme.Scheme, pod)
  6. if err != nil {
  7. glog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err)
  8. }
  9. ...
  10. }
  11. // 2、kill 掉 sandbox 已经改变的 pod
  12. if podContainerChanges.KillPod {
  13. ...
  14. } else {
  15. // 3、kill 掉非 running 状态的 containers
  16. ...
  17. for containerID, containerInfo := range podContainerChanges.ContainersToKill {
  18. ...
  19. if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
  20. ...
  21. }
  22. }
  23. }
  24. m.pruneInitContainersBeforeStart(pod, podStatus)
  25. podIP := ""
  26. if podStatus != nil {
  27. podIP = podStatus.IP
  28. }
  29. // 4、创建 sandbox
  30. podSandboxID := podContainerChanges.SandboxID
  31. if podContainerChanges.CreateSandbox {
  32. podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt)
  33. if err != nil {
  34. ...
  35. }
  36. ...
  37. podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
  38. if err != nil {
  39. ...
  40. }
  41. // 如果 pod 网络是 host 模式,容器也相同;其他情况下,容器会使用 None 网络模式,让 kubelet 的网络插件自己进行网络配置
  42. if !kubecontainer.IsHostNetworkPod(pod) {
  43. podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
  44. glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
  45. }
  46. }
  47. configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID)
  48. result.AddSyncResult(configPodSandboxResult)
  49. // 获取 PodSandbox 的配置(如:metadata,clusterDNS,容器的端口映射等)
  50. podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt)
  51. ...
  52. // 5、启动 init container
  53. if container := podContainerChanges.NextInitContainerToStart; container != nil {
  54. ...
  55. if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeInit); err != nil {
  56. ...
  57. }
  58. }
  59. // 6、启动业务容器
  60. for _, idx := range podContainerChanges.ContainersToStart {
  61. ...
  62. if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
  63. ...
  64. }
  65. }
  66. return
  67. }

9、启动容器

最终由 startContainer 完成容器的启动,其主要有以下几个步骤:

  • 1、拉取镜像
  • 2、生成业务容器的配置信息
  • 3、调用 docker api 创建容器
  • 4、启动容器
  • 5、执行 post start hook
  1. func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, container *v1.Container, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, containerType kubecontainer.ContainerType) (string, error) {
  2. // 1、检查业务镜像是否存在,不存在则到 Docker Registry 或是 Private Registry 拉取镜像。
  3. imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets)
  4. if err != nil {
  5. ...
  6. }
  7. ref, err := kubecontainer.GenerateContainerRef(pod, container)
  8. if err != nil {
  9. ...
  10. }
  11. // 设置 RestartCount
  12. restartCount := 0
  13. containerStatus := podStatus.FindContainerStatusByName(container.Name)
  14. if containerStatus != nil {
  15. restartCount = containerStatus.RestartCount + 1
  16. }
  17. // 2、生成业务容器的配置信息
  18. containerConfig, cleanupAction, err := m.generateContainerConfig(container, pod, restartCount, podIP, imageRef, containerType)
  19. if cleanupAction != nil {
  20. defer cleanupAction()
  21. }
  22. ...
  23. // 3、通过 client.CreateContainer 调用 docker api 创建业务容器
  24. containerID, err := m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)
  25. if err != nil {
  26. ...
  27. }
  28. err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
  29. if err != nil {
  30. ...
  31. }
  32. ...
  33. // 3、启动业务容器
  34. err = m.runtimeService.StartContainer(containerID)
  35. if err != nil {
  36. ...
  37. }
  38. containerMeta := containerConfig.GetMetadata()
  39. sandboxMeta := podSandboxConfig.GetMetadata()
  40. legacySymlink := legacyLogSymlink(containerID, containerMeta.Name, sandboxMeta.Name,
  41. sandboxMeta.Namespace)
  42. containerLog := filepath.Join(podSandboxConfig.LogDirectory, containerConfig.LogPath)
  43. if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
  44. if err := m.osInterface.Symlink(containerLog, legacySymlink); err != nil {
  45. glog.Errorf("Failed to create legacy symbolic link %q to container %q log %q: %v",
  46. legacySymlink, containerID, containerLog, err)
  47. }
  48. }
  49. // 4、执行 post start hook
  50. if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
  51. kubeContainerID := kubecontainer.ContainerID{
  52. Type: m.runtimeName,
  53. ID: containerID,
  54. }
  55. // runner.Run 这个方法的主要作用就是在业务容器起来的时候,
  56. // 首先会执行一个 container hook(PostStart 和 PreStop),做一些预处理工作。
  57. // 只有 container hook 执行成功才会运行具体的业务服务,否则容器异常。
  58. msg, handlerErr := m.runner.Run(kubeContainerID, pod, container, container.Lifecycle.PostStart)
  59. if handlerErr != nil {
  60. ...
  61. }
  62. }
  63. return "", nil
  64. }

总结

本文主要讲述了 kubelet 从监听到有容器调度至本节点再到容器创建的一个过程,kubelet 最终调用 docker api 来创建容器的。结合上篇文章,可以看出 kubelet 从启动到创建 pod 的一个清晰过程。

参考:
k8s源码分析-kubelet

Kubelet源码分析(一):启动流程分析

kubelet 源码分析:pod 新建流程

kubelet创建Pod流程解析

Kubelet: Pod Lifecycle Event Generator (PLEG) Design- proposals

kubelet 创建 pod 的流程

Kubernetes源码分析——kubelet

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/563714
推荐阅读
相关标签
  

闽ICP备14008679号