Kubernetes 是作为新的 resouceManager 集成到 Spark 中的,集成的思路跟将 YARN 集成是类似的,Spark 本身提供 Standalone 这种资源管理的模式,当然是不够的。
而集成 Kubernetes 的方式,其实是很好理解的,也就是在 Spark 中起一个 Http 的客户端从而和 Kubernetes 的 ApiSever 进行通信,从而把与 Appication 相关的一些配置,例如如何创建 Driver 和 Executor 的 Pod,当然也包括对 Pod 的 Watch 相关。
Spark Kubernetes 的模块的代码其实并不多,建议大家到以下目录下利用 tree
- # 路径
- path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark
- ➜ spark git:(master) ✗ tree -d -L 3
- .
- ├── deploy
- │ └── k8s
- │ ├── features // 包括 Driver/Executor, configMap, secret 等配置的步骤
- │ └── submit // 跟 submit 有关
- └── scheduler
- └── cluster
- └── k8s // 跟 executor pod 的调度,状态等有关
代码结构还是很清晰的,一部分是与 deploy 有关,一部分是跟 scheduler 有关。
本文重点解析以下 submit 相关的代码。
- /path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit
- ├── K8sSubmitOps.scala // spark submit 相关
- ├── KubernetesClientApplication.scala // spark submit 的封装
- ├── KubernetesDriverBuilder.scala // Driver builder
- ├── LoggingPodStatusWatcher.scala // Spark Pod 的状态 Watcher
- └── MainAppResource.scala // 包含 Java/Python/R 的一些资源定义
然后看一下 Spark K8S 模式的入口类。
- private[spark] class KubernetesClientApplication extends SparkApplication {
- override def start(args: Array[String], conf: SparkConf): Unit = {
- val parsedArguments = ClientArguments.fromCommandLineArgs(args)
- run(parsedArguments, conf)
- }
- private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
- val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
- val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
- val kubernetesConf = KubernetesConf.createDriverConf(
- sparkConf,
- kubernetesAppId,
- clientArguments.mainAppResource,
- clientArguments.mainClass,
- clientArguments.driverArgs)
- val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
- val loggingInterval = if (waitForAppCompletion) Some(sparkConf.get(REPORT_INTERVAL)) else None
- val watcher = new LoggingPodStatusWatcherImpl(kubernetesAppId, loggingInterval)
- Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
- master,
- Some(kubernetesConf.namespace),
- SparkKubernetesClientFactory.ClientType.Submission,
- sparkConf, None, None)) { kubernetesClient =>
- val client = new Client(
- kubernetesConf,
- new KubernetesDriverBuilder(),
- kubernetesClient,
- waitForAppCompletion,
- watcher)
- client.run()
- }
- }
- }
这一段是 Spark K8S 的入口 Main Class,重点关注 run()
方法。首先生成一个 kubernetesAppId
,为什么不是 spark app name,原因是这个关于 App 的标识,会以 Label 的方式,标注在关于这个 App 的所有资源上,包括 Driver/Executor Pod,ConfigMap,Secret 等。
这个配置,默认值为 true
。表示当选择 cluster mode 的时候,laucher 进程是否会等待 App 结束后才会退出,如果改为 false
,则 laucher 进程会马上结束。
- private[spark] class Client(
- conf: KubernetesDriverConf, builder: KubernetesDriverBuilder,
- kubernetesClient: KubernetesClient, waitForAppCompletion: Boolean,
- watcher: LoggingPodStatusWatcher) extends Logging {
- def run(): Unit = {
- // driver Pod 的配置是这里来的
- val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
- // configmap 从这里来的
- val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
- val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
- // Driver 容器的配置
- val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container).addNewEnv()
- .endEnv()
- .addNewVolumeMount()
- .endVolumeMount()
- .build()
- // Driver Pod 的配置
- val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
- .editSpec()
- .addToContainers(resolvedDriverContainer)
- .addNewVolume()
- .withNewConfigMap()
- .withName(configMapName)
- .endConfigMap()
- .endVolume()
- .endSpec()
- .build()
- Utils.tryWithResource(
- kubernetesClient
- .pods()
- .withName(resolvedDriverPod.getMetadata.getName)
- .watch(watcher)) { _ =>
- val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
- try {
- // including configMap
- val otherKubernetesResources =
- resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
- addDriverOwnerReference(createdDriverPod, otherKubernetesResources)
- kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
- } catch {
- case NonFatal(e) =>
- kubernetesClient.pods().delete(createdDriverPod)
- throw e
- }
- val sId = s"${Option(conf.namespace).map(_ + ":").getOrElse("")}" +
- s"${resolvedDriverPod.getMetadata.getName}"
- if (waitForAppCompletion) {
- logInfo(s"Waiting for application ${conf.appName} with submission ID ${sId} to finish...")
- // watcher
- watcher.awaitCompletion()
- logInfo(s"Application ${conf.appName} with submission ID ${sId} finished.")
- } else {
- logInfo(s"Deployed Spark application ${conf.appName} with " +
- s"submission ID ${sId} into Kubernetes.")
- }
- }
- }
- // K8S 的特性,其他的包括 configmap secret executor-pod 这些,如果 driver 挂了,其他都要删除干净
- private def addDriverOwnerReference(driverPod: Pod, resources: Seq[HasMetadata]): Unit = {
- val driverPodOwnerReference = new OwnerReferenceBuilder()
- .withName(driverPod.getMetadata.getName)
- .withApiVersion(driverPod.getApiVersion)
- .withUid(driverPod.getMetadata.getUid)
- .withKind(driverPod.getKind)
- .withController(true)
- .build()
- // 给每个 resource 都给个名字
- resources.foreach { resource =>
- val originalMetadata = resource.getMetadata
- originalMetadata.setOwnerReferences(Collections.singletonList(driverPodOwnerReference))
- }
- }
- // 创建 ConfigMap,主要是得到像 Hadoop/Spark Conf 之类的配置信息
- private def buildConfigMap(configMapName: String, conf: Map[String, String]): ConfigMap = {
- // Java 的 Prop?
- val properties = new Properties()
- conf.foreach { case (k, v) =>
- properties.setProperty(k, v)
- }
- val propertiesWriter = new StringWriter()
- properties.store(propertiesWriter,
- s"Java properties built from Kubernetes config map with name: $configMapName")
- new ConfigMapBuilder()
- .withNewMetadata()
- .withName(configMapName)
- .endMetadata()
- .addToData(SPARK_CONF_FILE_NAME, propertiesWriter.toString)
- .build()
- }
- }
然后看看 Driver Pod 都需要配置些什么东西。
- private[spark] class KubernetesDriverBuilder {
- def buildFromFeatures(
- conf: KubernetesDriverConf,
- client: KubernetesClient): KubernetesDriverSpec = {
- val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
- .map { file =>
- // Spark 3.0 开始可以支持传 Pod 的 Template 文件,而且 Template 后会覆盖之前的配置,Priority 高
- KubernetesUtils.loadPodFromTemplate(
- client,
- new File(file),
- }
- .getOrElse(SparkPod.initialPod())
- // 重点关注,这里是配置 Pod 的步骤清单
- val features: Seq[KubernetesFeatureConfigStep] = Seq(
- new BasicDriverFeatureStep(conf),
- new DriverKubernetesCredentialsFeatureStep(conf),
- new DriverServiceFeatureStep(conf),
- new MountSecretsFeatureStep(conf),
- new EnvSecretsFeatureStep(conf),
- new LocalDirsFeatureStep(conf),
- new MountVolumesFeatureStep(conf),
- new DriverCommandFeatureStep(conf),
- new HadoopConfDriverFeatureStep(conf),
- new KerberosConfDriverFeatureStep(conf),
- new PodTemplateConfigMapStep(conf))
- val spec = KubernetesDriverSpec(
- initialPod,
- driverKubernetesResources = Seq.empty,
- conf.sparkConf.getAll.toMap)
- features.foldLeft(spec) { case (spec, feature) =>
- val configuredPod = feature.configurePod(spec.pod)
- val addedSystemProperties = feature.getAdditionalPodSystemProperties()
- val addedResources = feature.getAdditionalKubernetesResources()
- KubernetesDriverSpec(
- configuredPod,
- spec.driverKubernetesResources ++ addedResources,
- spec.systemProperties ++ addedSystemProperties)
- }
- }
- }
然后看一下 Pod 状态的监听器。原理是创建一个 scheduler
后台线程池,按照配置的时间间隔,去监听 Pod 的状态。
- private[k8s] class LoggingPodStatusWatcherImpl(
- appId: String,
- maybeLoggingInterval: Option[Long]) extends LoggingPodStatusWatcher with Logging {
- private val podCompletedFuture = new CountDownLatch(1)
- // start timer for periodic logging
- private val scheduler =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
- private val logRunnable: Runnable = () => logShortStatus()
- private var pod = Option.empty[Pod]
- private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown")
- def start(): Unit = {
- maybeLoggingInterval.foreach { interval =>
- scheduler.scheduleAtFixedRate(logRunnable, , interval, TimeUnit.MILLISECONDS)
- }
- }
- // 当异步接受到事件的时候,判断 Pod 是已经 Succeeded 还是 Failed
- override def eventReceived(action: Action, pod: Pod): Unit = {
- this.pod = Option(pod)
- action match {
- case Action.DELETED | Action.ERROR =>
- closeWatch()
- case _ =>
- logLongStatus()
- if (hasCompleted()) {
- closeWatch()
- }
- }
- }
后,看看一些新的特性,比如说可以用 spark submit 来 kill 掉整个 APP。
- spark-submit --kill dbyin:spark-hdfs-* --master k8s://https://kubernetes.default.svc --conf spark.kubernetes.namespace=dbyin
- private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
- with CommandLineLoggingUtils {
- private def isGlob(name: String): Boolean = {
- name.last == '*'
- }
- def execute(submissionId: String, sparkConf: SparkConf, op: K8sSubmitOp): Unit = {
- val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
- submissionId.split(":", 2) match {
- case Array(part1, part2@_*) =>
- val namespace = if (part2.isEmpty) None else Some(part1)
- val pName = if (part2.isEmpty) part1 else part2.headOption.get
- Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
- master,
- namespace,
- SparkKubernetesClientFactory.ClientType.Submission,
- sparkConf,
- None,
- None)
- ) { kubernetesClient =>
- implicit val client: KubernetesClient = kubernetesClient
- if (isGlob(pName)) {
- val ops = namespace match {
- case Some(ns) =>
- kubernetesClient
- .pods
- .inNamespace(ns)
- case None =>
- kubernetesClient
- .pods
- }
- val pods = ops
- .list()
- .getItems
- .asScala
- .filter { pod =>
- val meta = pod.getMetadata
- meta.getName.startsWith(pName.stripSuffix("*")) &&
- }.toList
- op.executeOnGlob(pods, namespace, sparkConf)
- } else {
- op.executeOnPod(pName, namespace, sparkConf)
- }
- }
- case _ =>
- printErrorAndExit(s"Submission ID: {$submissionId} is invalid.")
- }
- }
- // 这是可以 kill 掉 Spark App 的方法
- override def kill(submissionId: String, conf: SparkConf): Unit = {
- printMessage(s"Submitting a request to kill submission " +
- s"${submissionId} in ${conf.get("spark.master")}. " +
- s"Grace period in secs: ${getGracePeriod(conf).getOrElse("not set.")}")
- execute(submissionId, conf, new KillApplication)
- }
- // 这是可以看 App 状态的
- override def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit = {
- printMessage(s"Submitting a request for the status of submission" +
- s" ${submissionId} in ${conf.get("spark.master")}.")
- execute(submissionId, conf, new ListStatus)
- }
- override def supports(master: String): Boolean = {
- master.startsWith("k8s://")
- }
- }
