当前位置:   article > 正文

Spark Kubernetes 的源码分析系列 - features

Spark Kubernetes 的源码分析系列 - features

1 Overview

features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。

2 分析

看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。

  1. /path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features
  2. ├── BasicDriverFeatureStep.scala
  3. ├── BasicExecutorFeatureStep.scala
  4. ├── DriverCommandFeatureStep.scala
  5. ├── DriverKubernetesCredentialsFeatureStep.scala
  6. ├── DriverServiceFeatureStep.scala
  7. ├── EnvSecretsFeatureStep.scala
  8. ├── ExecutorKubernetesCredentialsFeatureStep.scala
  9. ├── HadoopConfDriverFeatureStep.scala
  10. ├── KerberosConfDriverFeatureStep.scala
  11. ├── KubernetesFeatureConfigStep.scala
  12. ├── LocalDirsFeatureStep.scala
  13. ├── MountSecretsFeatureStep.scala
  14. ├── MountVolumesFeatureStep.scala
  15. └── PodTemplateConfigMapStep.scala

还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到的,在 KubernetesDriverBuilder 中,有一个 features 这个变量,这里需要 new 很多配置,也就是具体的用来配置 Pod 的一些步骤。

  1. val features = Seq(
  2. new BasicDriverFeatureStep(conf),
  3. new DriverKubernetesCredentialsFeatureStep(conf),
  4. new DriverServiceFeatureStep(conf),
  5. new MountSecretsFeatureStep(conf),
  6. new EnvSecretsFeatureStep(conf),
  7. new LocalDirsFeatureStep(conf),
  8. new MountVolumesFeatureStep(conf),
  9. new DriverCommandFeatureStep(conf),
  10. new HadoopConfDriverFeatureStep(conf),
  11. new KerberosConfDriverFeatureStep(conf),
  12. new PodTemplateConfigMapStep(conf))

下面我们按照顺序来分析一下。

2.1 BasicDriverFeatureStep

类名就告诉我们,他是干嘛用的了,就是 Driver Feature 相对 Basic 的部分 feature,那么 Baisc 的 feature 包括什么呢?

  1. driverPodName // Driver Pod 的名字
  2. driverContainerImage // Driver Container
  3. driverCpuCores // Driver 需要的 Cpu Cores
  4. driverCoresRequest // Driver 的 Request Cpu Cores(K8S相关)
  5. driverLimitCores // Driver 的 Limit Cpu Cores(K8S相关)
  6. driverMemoryMiB // Driver 的内存 MiB
  7. overheadFactor // 这个稍后会讲到
  8. memoryOverheadMiB // 这个稍后会讲到
  9. driverMemoryWithOverheadMiB // 这个稍后会讲到

以上的参数,生成后,主要是用于配置 Pod 和 Container 的参数。这一块内容又长又臭,可以看看我写在里面的注释。

  1. # 一堆的 Builder
  2. val driverContainer = new ContainerBuilder(pod.container)
  3. # Container Name
  4. .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
  5. # Image Name
  6. .withImage(driverContainerImage)
  7. # Image 拉取的策略
  8. .withImagePullPolicy(conf.imagePullPolicy)
  9. # Driver 的端口
  10. .addNewPort()
  11. .withName(DRIVER_PORT_NAME)
  12. .withContainerPort(driverPort)
  13. .withProtocol("TCP")
  14. .endPort()
  15. .addNewPort()
  16. # Block Manager 的 Port 相关配置
  17. .withName(BLOCK_MANAGER_PORT_NAME)
  18. .withContainerPort(driverBlockManagerPort)
  19. .withProtocol("TCP")
  20. .endPort()
  21. .addNewPort()
  22. # Spark UI 的端口配置
  23. .withName(UI_PORT_NAME)
  24. .withContainerPort(driverUIPort)
  25. .withProtocol("TCP")
  26. .endPort()
  27. .addNewEnv()
  28. # 一些环境变量
  29. .withName(ENV_SPARK_USER)
  30. .withValue(Utils.getCurrentUserName())
  31. .endEnv()
  32. .addAllToEnv(driverCustomEnvs.asJava)
  33. .addNewEnv()
  34. .withName(ENV_DRIVER_BIND_ADDRESS)
  35. .withValueFrom(new EnvVarSourceBuilder()
  36. .withNewFieldRef("v1", "status.podIP")
  37. .build())
  38. .endEnv()
  39. .editOrNewResources()
  40. # cpu 相关配置
  41. .addToRequests("cpu", driverCpuQuantity)
  42. .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
  43. .addToRequests("memory", driverMemoryQuantity)
  44. .addToLimits("memory", driverMemoryQuantity)
  45. .addToLimits(driverResourceQuantities.asJava)
  46. .endResources()
  47. # 终于 build 完
  48. .build()
  49. val driverPod = new PodBuilder(pod.pod)
  50. # 如果 Pod 是存在的,表示要么修改,否则就是新增
  51. .editOrNewMetadata()
  52. # Pod 的名字
  53. .withName(driverPodName)
  54. # Pod 的 Label
  55. .addToLabels(conf.labels.asJava)
  56. .addToAnnotations(conf.annotations.asJava)
  57. .endMetadata()
  58. .editOrNewSpec()
  59. # Pod 的重启策略
  60. .withRestartPolicy("Never")
  61. # Pod 的 NodeSelector 特性
  62. .addToNodeSelector(conf.nodeSelector.asJava)
  63. # 拉取镜像的 Repository 密码(ru
  64. .addToImagePullSecrets(conf.imagePullSecrets: _*)
  65. .endSpec()
  66. .build()

此外 getAdditionalPodSystemProperties() 还需要这个方法是拉取其他的配置,比如说 spark.app.id 等等,不赘述了。

2.2 DriverKubernetesCredentialsFeatureStep

这个 Step 是用于配置 Driver 的安全认证相关的配置,一般认为就是 K8S 那一套安全认证的机制了。

  1. maybeMountedOAuthTokenFile // OAuthToken 文件
  2. maybeMountedClientKeyFile // Client Key 文件
  3. maybeMountedClientCertFile // Cient Cert 文件
  4. maybeMountedCaCertFile // Ca Cert 文件
  5. driverServiceAccount // Driver 的 Service Account
  6. oauthTokenBase64 // OauthToken Base64 编码
  7. caCertDataBase64 // CaCert 里面的数据 Base64 编码
  8. clientKeyDataBase64 // Client Key 数据的 Base64 编码
  9. clientCertDataBase64 // Client Cert 数据的 Base 64 编码
  10. shouldMountSecret // 是否需要挂载 Secret
  11. driverCredentialsSecretName // Driver 的认证 Secret 名

这里有很多关于访问 ApiServer 的安全认证的细节,如果不熟悉 K8S 的同学,需要补补课。下面是这个 Step 的关键方法,也就是把这些安全相关的文件通过 secret 保存下来。

  1. override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
  2. // 如果 conf 存在以上提及的一些认证文件,则会进行挂载 Secret
  3. if (shouldMountSecret) {
  4. Seq(createCredentialsSecret())
  5. } else {
  6. Seq.empty
  7. }
  8. }

2.3 DriverServiceFeatureStep

这个就是配置 Driver Service 的 Step,因为 Pod 在 K8S 集群里,创建 Executor 需要不同的 Executor Pod 访问到 Driver Pod,才能注册上,也包括 Block Manager 以及 Spark UI 的端口和服务负载配置。

  1. preferredServiceName // Service Name
  2. resolvedServiceName // 上面的 Service Name 超过63个字符的话需要重新配置
  3. driverPort // Driver 的端口
  4. driverBlockManagerPort // Block Manager 的端口
  5. driverUIPort // Spark UI 的端口

上面的 Service Name 超过63个字符的话需要重新配置。

  1. private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
  2. preferredServiceName
  3. } else {
  4. // 超过63个字符,就是需要系统内部重置这个名字了
  5. val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
  6. val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
  7. logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
  8. s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
  9. s"$shorterServiceName as the driver service's name.")
  10. shorterServiceName
  11. }

2.4 MountSecretsFeatureStep

2.5 EnvSecretsFeatureStep

2.6 LocalDirsFeatureStep

  1. resolvedLocalDirs // 本地目录
  2. useLocalDirTmpFs // 如果 conf 配置为 true,则表示本地目录会用其他的存储系统,例如内存,具体请看 spark.kubernetes.local.dirs.tmpfs

2.7 MountVolumesFeatureStep

2.8 DriverCommandFeatureStep

这是关于 Driver 命令行的一些配置,具体看看注释是怎么解释的。

  1. /**
  2. * Creates the driver command for running the user app, and propagates needed configuration so
  3. * executors can also find the app code.
  4. */

2.9 HadoopConfDriverFeatureStep

这是用于挂载 Hadoop 配置文件的 Step,例如访问 HDFS 的时候,需要 core-site.x ml,hdfs-site.xml 等等。

  1. confDir // Hadoop 相关的环境变量 HADOOP_CONF_DIR
  2. existingConfMap // spark.kubernetes.hadoop.configMapName 提交任务的 configMap 名字,这些可以提前生成,直接挂载
  3. confFiles // 配置文件

然后具体看看 Hadoop 的配置文件是如何通过 configMap 挂载到 Driver Pod 上的。

  1. override def configurePod(original: SparkPod): SparkPod = {
  2. original.transform { case pod if hasHadoopConf =>
  3. // 如果有环境变量,就从环境变量指定的路径获取
  4. val confVolume = if (confDir.isDefined) {
  5. val keyPaths = confFiles.map { file =>
  6. new KeyToPathBuilder()
  7. .withKey(file.getName())
  8. .withPath(file.getName())
  9. .build()
  10. }
  11. new VolumeBuilder()
  12. .withName(HADOOP_CONF_VOLUME)
  13. .withNewConfigMap()
  14. .withName(newConfigMapName)
  15. .withItems(keyPaths.asJava)
  16. .endConfigMap()
  17. .build()
  18. } else {
  19. // 没有环境变量的话,就直接用存在的 configMap
  20. new VolumeBuilder()
  21. .withName(HADOOP_CONF_VOLUME)
  22. .withNewConfigMap()
  23. .withName(existingConfMap.get)
  24. .endConfigMap()
  25. .build()
  26. }
  27. // 修改 Pod,通过 editSpec 方法
  28. val podWithConf = new PodBuilder(pod.pod)
  29. .editSpec()
  30. .addNewVolumeLike(confVolume)
  31. .endVolume()
  32. .endSpec()
  33. .build()
  34. // Container Mount 需要的 Volume
  35. val containerWithMount = new ContainerBuilder(pod.container)
  36. .addNewVolumeMount()
  37. .withName(HADOOP_CONF_VOLUME)
  38. .withMountPath(HADOOP_CONF_DIR_PATH)
  39. .endVolumeMount()
  40. .addNewEnv()
  41. .withName(ENV_HADOOP_CONF_DIR)
  42. .withValue(HADOOP_CONF_DIR_PATH)
  43. .endEnv()
  44. .build()
  45. SparkPod(podWithConf, containerWithMount)
  46. }
  47. }

2.10 KerberosConfDriverFeatureStep

这是关于 Kerberos 配置的 Step。

  1. /**
  2. * Provide kerberos / service credentials to the Spark driver.
  3. *
  4. * There are three use cases, in order of precedence:
  5. * Kerberos 的服务,有三种场景
  6. *
  7. * - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
  8. * manage the kerberos login and the creation of delegation tokens.
  9. * - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
  10. * on the driver pod, and the driver will handle distribution of those tokens to executors.
  11. * - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
  12. * tokens which will be provided to the driver. The driver will handle distribution of the
  13. * tokens to executors.
  14. */
  15. principal // 指的是 KDC 中账号的 Principal
  16. keytab // 指的是 Kerberos 生成的 Keytab
  17. existingSecretName // 存在的 secret name
  18. existingSecretItemKey // secret 中的 item key
  19. krb5File // Kerberos 服务的配置文件
  20. krb5CMap // krb5 的 configMap
  21. hadoopConf // 多余?
  22. delegationTokens // Hadoop 体系中的轻量级认证 DT

生成 token 的关键代码如下。

  1. private lazy val delegationTokens: Array[Byte] = {
  2. // 如果 keytab 和 secret 都是空的,就去生成 DT
  3. if (keytab.isEmpty && existingSecretName.isEmpty) {
  4. val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
  5. SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
  6. val creds = UserGroupInformation.getCurrentUser().getCredentials()
  7. tokenManager.obtainDelegationTokens(creds)
  8. // If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
  9. // to avoid creating an unnecessary secret.
  10. if (creds.numberOfTokens() > || creds.numberOfSecretKeys() > ) {
  11. SparkHadoopUtil.get.serialize(creds)
  12. } else {
  13. null
  14. }
  15. } else {
  16. null
  17. }
  18. }

2.11 PodTemplateConfigMapStep

可以指定 Executor 的 Pod 的模板 spark.kubernetes.executor.podTemplateFile。所以这个 Step 主要就是来解析这个 Pod Template 的。

3 Summary

可以看到 Driver 的构建是通过多个 feature 的配置来组装起来的,终都会通过 K8S 的 Java 客户端来跟 ApiServer 交互来在 K8S 集群中生成 Driver。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/400379
推荐阅读
相关标签
  

闽ICP备14008679号