当前位置:   article > 正文

Spark Kubernetes 的源码分析系列 - submit

Spark Kubernetes 的源码分析系列 - submit

1 Overview

Kubernetes 是作为新的 resouceManager 集成到 Spark 中的,集成的思路跟将 YARN 集成是类似的,Spark 本身提供 Standalone 这种资源管理的模式,当然是不够的。

而集成 Kubernetes 的方式,其实是很好理解的,也就是在 Spark 中起一个 Http 的客户端从而和 Kubernetes 的 ApiSever 进行通信,从而把与 Appication 相关的一些配置,例如如何创建 Driver 和 Executor 的 Pod,当然也包括对 Pod 的 Watch 相关。

2 源码分析

Spark Kubernetes 的模块的代码其实并不多,建议大家到以下目录下利用 tree 简单看一下。

  1. # 路径
  2. path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark
  3. ➜ spark git:(master) ✗ tree -d -L 3
  4. .
  5. ├── deploy
  6. │ └── k8s
  7. │ ├── features // 包括 Driver/Executor, configMap, secret 等配置的步骤
  8. │ └── submit // 跟 submit 有关
  9. └── scheduler
  10. └── cluster
  11. └── k8s // 跟 executor pod 的调度,状态等有关

代码结构还是很清晰的,一部分是与 deploy 有关,一部分是跟 scheduler 有关。

本文重点解析以下 submit 相关的代码。

  1. /path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit
  2. ├── K8sSubmitOps.scala // spark submit 相关
  3. ├── KubernetesClientApplication.scala // spark submit 的封装
  4. ├── KubernetesDriverBuilder.scala // Driver builder
  5. ├── LoggingPodStatusWatcher.scala // Spark Pod 的状态 Watcher
  6. └── MainAppResource.scala // 包含 Java/Python/R 的一些资源定义

然后看一下 Spark K8S 模式的入口类。

  1. private[spark] class KubernetesClientApplication extends SparkApplication {
  2. override def start(args: Array[String], conf: SparkConf): Unit = {
  3. val parsedArguments = ClientArguments.fromCommandLineArgs(args)
  4. run(parsedArguments, conf)
  5. }
  6. private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
  7. val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
  8. val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
  9. val kubernetesConf = KubernetesConf.createDriverConf(
  10. sparkConf,
  11. kubernetesAppId,
  12. clientArguments.mainAppResource,
  13. clientArguments.mainClass,
  14. clientArguments.driverArgs)
  15. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
  16. val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
  17. val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
  18. Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
  19. master,
  20. Some(kubernetesConf.namespace),
  21. KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
  22. SparkKubernetesClientFactory.ClientType.Submission,
  23. sparkConf, None, None)) { kubernetesClient =>
  24. val client = new Client(
  25. kubernetesConf,
  26. new KubernetesDriverBuilder(),
  27. kubernetesClient,
  28. waitForAppCompletion,
  29. watcher)
  30. client.run()
  31. }
  32. }
  33. }

这一段是 Spark K8S 的入口 Main Class,重点关注 run() 方法。首先生成一个 kubernetesAppId,为什么不是 spark app name,原因是这个关于 App 的标识,会以 Label 的方式,标注在关于这个 App 的所有资源上,包括 Driver/Executor Pod,ConfigMap,Secret 等。

可以留意一下 WAIT_FOR_APP_COMPLETION 这个配置,默认值为 true。表示当选择 cluster mode 的时候,laucher 进程是否会等待 App 结束后才会退出,如果改为 false,则 laucher 进程会马上结束。

  1. private[spark] class Client(
  2. conf: KubernetesDriverConf, builder: KubernetesDriverBuilder,
  3. kubernetesClient: KubernetesClient, waitForAppCompletion: Boolean,
  4. watcher: LoggingPodStatusWatcher) extends Logging {
  5. def run(): Unit = {
  6. // driver Pod 的配置是这里来的
  7. val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
  8. // configmap 从这里来的
  9. val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
  10. val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
  11. // Driver 容器的配置
  12. val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container).addNewEnv()
  13. .withName(ENV_SPARK_CONF_DIR)
  14. .withValue(SPARK_CONF_DIR_INTERNAL)
  15. .endEnv()
  16. .addNewVolumeMount()
  17. .withName(SPARK_CONF_VOLUME)
  18. .withMountPath(SPARK_CONF_DIR_INTERNAL)
  19. .endVolumeMount()
  20. .build()
  21. // Driver Pod 的配置
  22. val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
  23. .editSpec()
  24. .addToContainers(resolvedDriverContainer)
  25. .addNewVolume()
  26. .withName(SPARK_CONF_VOLUME)
  27. .withNewConfigMap()
  28. .withName(configMapName)
  29. .endConfigMap()
  30. .endVolume()
  31. .endSpec()
  32. .build()
  33. Utils.tryWithResource(
  34. kubernetesClient
  35. .pods()
  36. .withName(resolvedDriverPod.getMetadata.getName)
  37. .watch(watcher)) { _ =>
  38. val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
  39. try {
  40. // including configMap
  41. val otherKubernetesResources =
  42. resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
  43. addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
  44. kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
  45. } catch {
  46. case NonFatal(e) =>
  47. kubernetesClient.pods().delete(createdDriverPod)
  48. throw e
  49. }
  50. val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
  51. s"${resolvedDriverPod.getMetadata.getName}"
  52. if (waitForAppCompletion) {
  53. logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
  54. // watcher
  55. watcher.awaitCompletion()
  56. logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
  57. } else {
  58. logInfo(s"Deployed Spark application ${conf.appName} with " +
  59. s"submission ID ${sId} into Kubernetes.")
  60. }
  61. }
  62. }
  63. // K8S 的特性,其他的包括 configmap secret executor-pod 这些,如果 driver 挂了,其他都要删除干净
  64. private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
  65. val driverPodOwnerReference = new OwnerReferenceBuilder()
  66. .withName(driverPod.getMetadata.getName)
  67. .withApiVersion(driverPod.getApiVersion)
  68. .withUid(driverPod.getMetadata.getUid)
  69. .withKind(driverPod.getKind)
  70. .withController(true)
  71. .build()
  72. // 给每个 resource 都给个名字
  73. resources.foreach { resource =>
  74. val originalMetadata = resource.getMetadata
  75. originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
  76. }
  77. }
  78. // 创建 ConfigMap,主要是得到像 Hadoop/Spark Conf 之类的配置信息
  79. private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
  80. // Java 的 Prop?
  81. val properties = new Properties()
  82. conf.foreach { case (k, v) =>
  83. properties.setProperty(k, v)
  84. }
  85. val propertiesWriter = new StringWriter()
  86. properties.store(propertiesWriter,
  87. s"Java properties built from Kubernetes config map with name: $configMapName")
  88. new ConfigMapBuilder()
  89. .withNewMetadata()
  90. .withName(configMapName)
  91. .endMetadata()
  92. .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
  93. .build()
  94. }
  95. }

然后看看 Driver Pod 都需要配置些什么东西。

  1. private[spark] class KubernetesDriverBuilder {
  2. def buildFromFeatures(
  3. conf: KubernetesDriverConf,
  4. client: KubernetesClient): KubernetesDriverSpec = {
  5. val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
  6. .map { file =>
  7. // Spark 3.0 开始可以支持传 Pod 的 Template 文件,而且 Template 后会覆盖之前的配置,Priority 高
  8. KubernetesUtils.loadPodFromTemplate(
  9. client,
  10. new File(file),
  11. conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
  12. }
  13. .getOrElse(SparkPod.initialPod())
  14. // 重点关注,这里是配置 Pod 的步骤清单
  15. val features: Seq[KubernetesFeatureConfigStep] = Seq(
  16. new BasicDriverFeatureStep(conf),
  17. new DriverKubernetesCredentialsFeatureStep(conf),
  18. new DriverServiceFeatureStep(conf),
  19. new MountSecretsFeatureStep(conf),
  20. new EnvSecretsFeatureStep(conf),
  21. new LocalDirsFeatureStep(conf),
  22. new MountVolumesFeatureStep(conf),
  23. new DriverCommandFeatureStep(conf),
  24. new HadoopConfDriverFeatureStep(conf),
  25. new KerberosConfDriverFeatureStep(conf),
  26. new PodTemplateConfigMapStep(conf))
  27. val spec = KubernetesDriverSpec(
  28. initialPod,
  29. driverKubernetesResources = Seq.empty,
  30. conf.sparkConf.getAll.toMap)
  31. features.foldLeft(spec) { case (spec, feature) =>
  32. val configuredPod = feature.configurePod(spec.pod)
  33. val addedSystemProperties = feature.getAdditionalPodSystemProperties()
  34. val addedResources = feature.getAdditionalKubernetesResources()
  35. KubernetesDriverSpec(
  36. configuredPod,
  37. spec.driverKubernetesResources ++ addedResources,
  38. spec.systemProperties ++ addedSystemProperties)
  39. }
  40. }
  41. }

然后看一下 Pod 状态的监听器。原理是创建一个 scheduler 后台线程池,按照配置的时间间隔,去监听 Pod 的状态。

  1. private[k8s] class LoggingPodStatusWatcherImpl(
  2. appId: String,
  3. maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging {
  4. private val podCompletedFuture = new CountDownLatch(1)
  5. // start timer for periodic logging
  6. private val scheduler =
  7. ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
  8. private val logRunnable: Runnable = () => logShortStatus()
  9. private var pod = Option.empty[Pod]
  10. private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
  11. def start(): Unit = {
  12. maybeLoggingInterval.foreach { interval =>
  13. scheduler.scheduleAtFixedRate(logRunnable, , interval, TimeUnit.MILLISECONDS)
  14. }
  15. }
  16. // 当异步接受到事件的时候,判断 Pod 是已经 Succeeded 还是 Failed
  17. override def eventReceived(action: Action, pod: Pod): Unit = {
  18. this.pod = Option(pod)
  19. action match {
  20. case Action.DELETED | Action.ERROR =>
  21. closeWatch()
  22. case _ =>
  23. logLongStatus()
  24. if (hasCompleted()) {
  25. closeWatch()
  26. }
  27. }
  28. }

后,看看一些新的特性,比如说可以用 spark submit 来 kill 掉整个 APP。

  1. spark-submit --kill dbyin:spark-hdfs-* --master k8s://https://kubernetes.default.svc --conf spark.kubernetes.namespace=dbyin
  2. private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
  3. with CommandLineLoggingUtils {
  4. private def isGlob(name: String): Boolean = {
  5. name.last == '*'
  6. }
  7. def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = {
  8. val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
  9. submissionId.split(":", 2) match {
  10. case Array(part1, part2@_*) =>
  11. val namespace = if (part2.isEmpty) None else Some(part1)
  12. val pName = if (part2.isEmpty) part1 else part2.headOption.get
  13. Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
  14. master,
  15. namespace,
  16. KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
  17. SparkKubernetesClientFactory.ClientType.Submission,
  18. sparkConf,
  19. None,
  20. None)
  21. ) { kubernetesClient =>
  22. implicit val client: KubernetesClient = kubernetesClient
  23. if (isGlob(pName)) {
  24. val ops = namespace match {
  25. case Some(ns) =>
  26. kubernetesClient
  27. .pods
  28. .inNamespace(ns)
  29. case None =>
  30. kubernetesClient
  31. .pods
  32. }
  33. val pods = ops
  34. .list()
  35. .getItems
  36. .asScala
  37. .filter { pod =>
  38. val meta = pod.getMetadata
  39. meta.getName.startsWith(pName.stripSuffix("*")) &&
  40. meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
  41. }.toList
  42. op.executeOnGlob(pods, namespace, sparkConf)
  43. } else {
  44. op.executeOnPod(pName, namespace, sparkConf)
  45. }
  46. }
  47. case _ =>
  48. printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
  49. }
  50. }
  51. // 这是可以 kill 掉 Spark App 的方法
  52. override def kill(submissionId: String, conf: SparkConf): Unit = {
  53. printMessage(s"Submitting a request to kill submission " +
  54. s"${submissionId} in ${conf.get("spark.master")}. " +
  55. s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}")
  56. execute(submissionId, conf, new KillApplication)
  57. }
  58. // 这是可以看 App 状态的
  59. override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
  60. printMessage(s"Submitting a request for the status of submission" +
  61. s" ${submissionId} in ${conf.get("spark.master")}.")
  62. execute(submissionId, conf, new ListStatus)
  63. }
  64. override def supports(master: String): Boolean = {
  65. master.startsWith("k8s://")
  66. }
  67. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/426539
推荐阅读
相关标签
  

闽ICP备14008679号