赞
踩
features 包里的代码,主要是用于构建 Spark 在 K8S 中的各类资源所需要的特征,个人觉得可以理解成这些 features 就是帮你写各类 Kind 的 YAML 文件。
看看 features 包里的代码。这里面都是 Spark 在 K8S 中构建各种资源的步骤。
- /path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features
- ├── BasicDriverFeatureStep.scala
- ├── BasicExecutorFeatureStep.scala
- ├── DriverCommandFeatureStep.scala
- ├── DriverKubernetesCredentialsFeatureStep.scala
- ├── DriverServiceFeatureStep.scala
- ├── EnvSecretsFeatureStep.scala
- ├── ExecutorKubernetesCredentialsFeatureStep.scala
- ├── HadoopConfDriverFeatureStep.scala
- ├── KerberosConfDriverFeatureStep.scala
- ├── KubernetesFeatureConfigStep.scala
- ├── LocalDirsFeatureStep.scala
- ├── MountSecretsFeatureStep.scala
- ├── MountVolumesFeatureStep.scala
- └── PodTemplateConfigMapStep.scala
还记得 Spark Kubernetes 的源码分析系列 - submit 文章里提到的,在 KubernetesDriverBuilder
中,有一个 features
这个变量,这里需要 new 很多配置,也就是具体的用来配置 Pod 的一些步骤。
- val features = Seq(
- new BasicDriverFeatureStep(conf),
- new DriverKubernetesCredentialsFeatureStep(conf),
- new DriverServiceFeatureStep(conf),
- new MountSecretsFeatureStep(conf),
- new EnvSecretsFeatureStep(conf),
- new LocalDirsFeatureStep(conf),
- new MountVolumesFeatureStep(conf),
- new DriverCommandFeatureStep(conf),
- new HadoopConfDriverFeatureStep(conf),
- new KerberosConfDriverFeatureStep(conf),
- new PodTemplateConfigMapStep(conf))
下面我们按照顺序来分析一下。
类名就告诉我们,他是干嘛用的了,就是 Driver Feature 相对 Basic 的部分 feature,那么 Baisc 的 feature 包括什么呢?
- driverPodName // Driver Pod 的名字
- driverContainerImage // Driver Container
- driverCpuCores // Driver 需要的 Cpu Cores
- driverCoresRequest // Driver 的 Request Cpu Cores(K8S相关)
- driverLimitCores // Driver 的 Limit Cpu Cores(K8S相关)
- driverMemoryMiB // Driver 的内存 MiB
- overheadFactor // 这个稍后会讲到
- memoryOverheadMiB // 这个稍后会讲到
- driverMemoryWithOverheadMiB // 这个稍后会讲到
以上的参数,生成后,主要是用于配置 Pod 和 Container 的参数。这一块内容又长又臭,可以看看我写在里面的注释。
- # 一堆的 Builder
- val driverContainer = new ContainerBuilder(pod.container)
- # Container Name
- .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
- # Image Name
- .withImage(driverContainerImage)
- # Image 拉取的策略
- .withImagePullPolicy(conf.imagePullPolicy)
- # Driver 的端口
- .addNewPort()
- .withName(DRIVER_PORT_NAME)
- .withContainerPort(driverPort)
- .withProtocol("TCP")
- .endPort()
- .addNewPort()
- # Block Manager 的 Port 相关配置
- .withName(BLOCK_MANAGER_PORT_NAME)
- .withContainerPort(driverBlockManagerPort)
- .withProtocol("TCP")
- .endPort()
- .addNewPort()
- # Spark UI 的端口配置
- .withName(UI_PORT_NAME)
- .withContainerPort(driverUIPort)
- .withProtocol("TCP")
- .endPort()
- .addNewEnv()
- # 一些环境变量
- .withName(ENV_SPARK_USER)
- .withValue(Utils.getCurrentUserName())
- .endEnv()
- .addAllToEnv(driverCustomEnvs.asJava)
- .addNewEnv()
- .withName(ENV_DRIVER_BIND_ADDRESS)
- .withValueFrom(new EnvVarSourceBuilder()
- .withNewFieldRef("v1", "status.podIP")
- .build())
- .endEnv()
- .editOrNewResources()
- # cpu 相关配置
- .addToRequests("cpu", driverCpuQuantity)
- .addToLimits(maybeCpuLimitQuantity.toMap.asJava)
- .addToRequests("memory", driverMemoryQuantity)
- .addToLimits("memory", driverMemoryQuantity)
- .addToLimits(driverResourceQuantities.asJava)
- .endResources()
- # 终于 build 完
- .build()
-
- val driverPod = new PodBuilder(pod.pod)
- # 如果 Pod 是存在的,表示要么修改,否则就是新增
- .editOrNewMetadata()
- # Pod 的名字
- .withName(driverPodName)
- # Pod 的 Label
- .addToLabels(conf.labels.asJava)
- .addToAnnotations(conf.annotations.asJava)
- .endMetadata()
- .editOrNewSpec()
- # Pod 的重启策略
- .withRestartPolicy("Never")
- # Pod 的 NodeSelector 特性
- .addToNodeSelector(conf.nodeSelector.asJava)
- # 拉取镜像的 Repository 密码(ru
- .addToImagePullSecrets(conf.imagePullSecrets: _*)
- .endSpec()
- .build()
此外 getAdditionalPodSystemProperties()
还需要这个方法是拉取其他的配置,比如说 spark.app.id
等等,不赘述了。
这个 Step 是用于配置 Driver 的安全认证相关的配置,一般认为就是 K8S 那一套安全认证的机制了。
- maybeMountedOAuthTokenFile // OAuthToken 文件
- maybeMountedClientKeyFile // Client Key 文件
- maybeMountedClientCertFile // Cient Cert 文件
- maybeMountedCaCertFile // Ca Cert 文件
- driverServiceAccount // Driver 的 Service Account
- oauthTokenBase64 // OauthToken Base64 编码
- caCertDataBase64 // CaCert 里面的数据 Base64 编码
- clientKeyDataBase64 // Client Key 数据的 Base64 编码
- clientCertDataBase64 // Client Cert 数据的 Base 64 编码
- shouldMountSecret // 是否需要挂载 Secret
- driverCredentialsSecretName // Driver 的认证 Secret 名
这里有很多关于访问 ApiServer 的安全认证的细节,如果不熟悉 K8S 的同学,需要补补课。下面是这个 Step 的关键方法,也就是把这些安全相关的文件通过 secret 保存下来。
- override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
- // 如果 conf 存在以上提及的一些认证文件,则会进行挂载 Secret
- if (shouldMountSecret) {
- Seq(createCredentialsSecret())
- } else {
- Seq.empty
- }
- }
这个就是配置 Driver Service 的 Step,因为 Pod 在 K8S 集群里,创建 Executor 需要不同的 Executor Pod 访问到 Driver Pod,才能注册上,也包括 Block Manager 以及 Spark UI 的端口和服务负载配置。
- preferredServiceName // Service Name
- resolvedServiceName // 上面的 Service Name 超过63个字符的话需要重新配置
- driverPort // Driver 的端口
- driverBlockManagerPort // Block Manager 的端口
- driverUIPort // Spark UI 的端口
上面的 Service Name 超过63个字符的话需要重新配置。
- private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
- preferredServiceName
- } else {
- // 超过63个字符,就是需要系统内部重置这个名字了
- val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
- val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
- logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " +
- s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " +
- s"$shorterServiceName as the driver service's name.")
- shorterServiceName
- }
- resolvedLocalDirs // 本地目录
- useLocalDirTmpFs // 如果 conf 配置为 true,则表示本地目录会用其他的存储系统,例如内存,具体请看 spark.kubernetes.local.dirs.tmpfs
这是关于 Driver 命令行的一些配置,具体看看注释是怎么解释的。
- /**
- * Creates the driver command for running the user app, and propagates needed configuration so
- * executors can also find the app code.
- */
这是用于挂载 Hadoop 配置文件的 Step,例如访问 HDFS 的时候,需要 core-site.x ml,hdfs-site.xml 等等。
- confDir // Hadoop 相关的环境变量 HADOOP_CONF_DIR
- existingConfMap // spark.kubernetes.hadoop.configMapName 提交任务的 configMap 名字,这些可以提前生成,直接挂载
- confFiles // 配置文件
然后具体看看 Hadoop 的配置文件是如何通过 configMap 挂载到 Driver Pod 上的。
- override def configurePod(original: SparkPod): SparkPod = {
-
- original.transform { case pod if hasHadoopConf =>
-
- // 如果有环境变量,就从环境变量指定的路径获取
- val confVolume = if (confDir.isDefined) {
- val keyPaths = confFiles.map { file =>
- new KeyToPathBuilder()
- .withKey(file.getName())
- .withPath(file.getName())
- .build()
- }
- new VolumeBuilder()
- .withName(HADOOP_CONF_VOLUME)
- .withNewConfigMap()
- .withName(newConfigMapName)
- .withItems(keyPaths.asJava)
- .endConfigMap()
- .build()
- } else {
- // 没有环境变量的话,就直接用存在的 configMap
- new VolumeBuilder()
- .withName(HADOOP_CONF_VOLUME)
- .withNewConfigMap()
- .withName(existingConfMap.get)
- .endConfigMap()
- .build()
- }
-
- // 修改 Pod,通过 editSpec 方法
- val podWithConf = new PodBuilder(pod.pod)
- .editSpec()
- .addNewVolumeLike(confVolume)
- .endVolume()
- .endSpec()
- .build()
-
- // Container Mount 需要的 Volume
- val containerWithMount = new ContainerBuilder(pod.container)
- .addNewVolumeMount()
- .withName(HADOOP_CONF_VOLUME)
- .withMountPath(HADOOP_CONF_DIR_PATH)
- .endVolumeMount()
- .addNewEnv()
- .withName(ENV_HADOOP_CONF_DIR)
- .withValue(HADOOP_CONF_DIR_PATH)
- .endEnv()
- .build()
-
- SparkPod(podWithConf, containerWithMount)
- }
- }
这是关于 Kerberos 配置的 Step。
- /**
- * Provide kerberos / service credentials to the Spark driver.
- *
- * There are three use cases, in order of precedence:
- * Kerberos 的服务,有三种场景
- *
- * - keytab: if a kerberos keytab is defined, it is provided to the driver, and the driver will
- * manage the kerberos login and the creation of delegation tokens.
- * - existing tokens: if a secret containing delegation tokens is provided, it will be mounted
- * on the driver pod, and the driver will handle distribution of those tokens to executors.
- * - tgt only: if Hadoop security is enabled, the local TGT will be used to create delegation
- * tokens which will be provided to the driver. The driver will handle distribution of the
- * tokens to executors.
- */
- principal // 指的是 KDC 中账号的 Principal
- keytab // 指的是 Kerberos 生成的 Keytab
- existingSecretName // 存在的 secret name
- existingSecretItemKey // secret 中的 item key
- krb5File // Kerberos 服务的配置文件
- krb5CMap // krb5 的 configMap
- hadoopConf // 多余?
- delegationTokens // Hadoop 体系中的轻量级认证 DT
生成 token 的关键代码如下。
- private lazy val delegationTokens: Array[Byte] = {
- // 如果 keytab 和 secret 都是空的,就去生成 DT
- if (keytab.isEmpty && existingSecretName.isEmpty) {
- val tokenManager = new HadoopDelegationTokenManager(kubernetesConf.sparkConf,
- SparkHadoopUtil.get.newConfiguration(kubernetesConf.sparkConf), null)
- val creds = UserGroupInformation.getCurrentUser().getCredentials()
- tokenManager.obtainDelegationTokens(creds)
- // If no tokens and no secrets are stored in the credentials, make sure nothing is returned,
- // to avoid creating an unnecessary secret.
- if (creds.numberOfTokens() > || creds.numberOfSecretKeys() > ) {
- SparkHadoopUtil.get.serialize(creds)
- } else {
- null
- }
- } else {
- null
- }
- }
可以指定 Executor 的 Pod 的模板 spark.kubernetes.executor.podTemplateFile
。所以这个 Step 主要就是来解析这个 Pod Template 的。
可以看到 Driver 的构建是通过多个 feature 的配置来组装起来的,终都会通过 K8S 的 Java 客户端来跟 ApiServer 交互来在 K8S 集群中生成 Driver。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。