当前位置:   article > 正文

Spark UI界面原理

spark的4040界面中,skipped stage

  当Spark程序在运行时,会提供一个Web页面查看Application运行状态信息。是否开启UI界面由参数spark.ui.enabled(默认为true)来确定。下面列出Spark UI一些相关配置参数,默认值,以及其作用。

参数默认值作用描述
spark.ui.enabledtrue是否开启UI界面
spark.ui.port4040(顺序探查空闲端口)UI界面的访问端口号
spark.ui.retainedJobs1000UI界面显示的Job个数
spark.ui.retailedStages1000UI界面上显示的Stage个数
spark.ui.timeline.tasks.maximum1000Stage页面显示的Tasks个数
spark.ui.killEnabledtrue是否运行页面上kill任务
spark.ui.threadDumpsEnabledtrueExecutors页面是否可以展示线程运行状况

  本文接下来分成两个部分,第一部分基于Spark-1.6.0的源码,结合第二部分的图片内容来描述UI界面在Spark中的实现方式。第二部分以实例展示Spark UI界面显示的内容。

一、Spark UI界面实现方式

1、UI组件结构

  这部分先讲UI界面的实现方式,UI界面的实例在本文最后一部分。如果对这部分中的某些概念不清楚,那么最好先把第二部分了解一下。
  从下面UI界面的实例可以看出,不同的内容以Tab的形式展现在界面上,对应每一个Tab在下方显示具体内容。基本上Spark UI界面也是按这个层次关系实现的。
  以SparkUI类为容器,各个Tab,如JobsTab, StagesTab, ExecutorsTab等镶嵌在SparkUI上,对应各个Tab,有页面内容实现类JobPage, StagePage, ExecutorsPage等页面。这些类的继承和包含关系如下图所示:
  这里写图片描述

2、初始化过程

  从上面可以看出,SparkUI类型的对象是UI界面的根对象,它是在SparkContext类中构造出来的。

  1. private var _ui: Option[SparkUI] = None //定义
  2. _ui = //SparkUI对象的生成
  3. if (conf.getBoolean("spark.ui.enabled", true)) {
  4. Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
  5. _env.securityManager, appName, startTime = startTime))
  6. } else {
  7. // For tests, do not enable the UI
  8. None
  9. }
  10. _ui.foreach(_.bind()) //启动jetty。bind方法继承自WebUI,该类负责和真实的Jetty Server API打交道
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

  上面这段代码中可以看到SparkUI对象的生成过程,结合上面的类结构图,可以看到bind方法继承自WebUI类,进入WebUI类中,

  1. protected val handlers = ArrayBuffer[ServletContextHandler]() // 这个对象在下面bind方法中会使用到。
  2. protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] // 将page绑定到handlers上
  3. /** 将Http Server绑定到这个Web页面 */
  4. def bind() {
  5. assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className))
  6. try {
  7. serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
  8. logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort))
  9. } catch {
  10. case e: Exception =>
  11. logError("Failed to bind %s".format(className), e)
  12. System.exit(1)
  13. }
  14. }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

上面代码中handlers对象维持了WebUIPage和Jetty之间的关系,org.eclipse.jetty.servlet.ServletContextHandler是标准jetty容器的handler。而对象pageToHandlers维持了WebUIPage到ServletContextHandler的对应关系。

  各Tab页以及该页内容的实现,基本上大同小异。接下来以AllJobsPage页面为例仔细梳理页面展示的过程。

3、SparkUI中Tab的绑定

  从上面的类结构图中看到WebUIPage提供了两个重要的方法,render和renderJson用于相应页面请求,在WebUIPage的实现类中,具体实现了这两个方法。
  在SparkContext中构造出SparkUI的实例后,会执行SparkUI#initialize方法进行初始化。如下面代码中,调用SparkUI从WebUI继承的attacheTab方法,将各Tab页面绑定到UI上。

  1. def initialize() {
  2. attachTab(new JobsTab(this))
  3. attachTab(stagesTab)
  4. attachTab(new StorageTab(this))
  5. attachTab(new EnvironmentTab(this))
  6. attachTab(new ExecutorsTab(this))
  7. attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
  8. attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
  9. attachHandler(ApiRootResource.getServletHandler(this))
  10. // This should be POST only, but, the YARN AM proxy won't proxy POSTs
  11. attachHandler(createRedirectHandler(
  12. "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
  13. httpMethods = Set("GET", "POST")))
  14. }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

4、页面内容绑定到Tab

  在上一节中,JobsTab标签绑定到SparkUI上之后,在JobsTab上绑定了AllJobsPage和JobPage类。AllJobsPage页面即访问SparkUI页面时列举出所有Job的那个页面,JobPage页面则是点击单个Job时跳转的页面。通过调用JobsTab从WebUITab继承的attachPage方法与JobsTab进行绑定。

  1. private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
  2. val sc = parent.sc
  3. val killEnabled = parent.killEnabled
  4. val jobProgresslistener = parent.jobProgressListener
  5. val executorListener = parent.executorsListener
  6. val operationGraphListener = parent.operationGraphListener
  7. def isFairScheduler: Boolean =
  8. jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)
  9. attachPage(new AllJobsPage(this))
  10. attachPage(new JobPage(this))
  11. }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

5、页面内容的展示

  知道了AllJobsPage页面如何绑定到SparkUI界面后,接下来分析这个页面的内容是如何显示的。进入AllJobsPage类,主要观察render方法。在页面展示上Spark直接利用了Scala对html/xml的语法支持,将页面的Html代码嵌入Scala程序中。具体的页面生成过程可以查看下面源码中的注释。这里可以结合第二部分的实例进行查看。

  1. def render(request: HttpServletRequest): Seq[Node] = {
  2. val listener = parent.jobProgresslistener //获取jobProgresslistener对象,页面展示的数据都是从这里读取
  3. listener.synchronized {
  4. val startTime = listener.startTime // 获取application的开始时间,默认值为-1L
  5. val endTime = listener.endTime // 获取application的结束时间,默认值为-1L
  6. val activeJobs = listener.activeJobs.values.toSeq // 获取当前application中处于active状态的job
  7. val completedJobs = listener.completedJobs.reverse.toSeq // 获取当前application中完成状态的job
  8. val failedJobs = listener.failedJobs.reverse.toSeq // 获取当前application中失败状态的job
  9. val activeJobsTable =
  10. jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
  11. val completedJobsTable =
  12. jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
  13. val failedJobsTable =
  14. jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
  15. val shouldShowActiveJobs = activeJobs.nonEmpty
  16. val shouldShowCompletedJobs = completedJobs.nonEmpty
  17. val shouldShowFailedJobs = failedJobs.nonEmpty
  18. val completedJobNumStr = if (completedJobs.size == listener.numCompletedJobs) {
  19. s"${completedJobs.size}"
  20. } else {
  21. s"${listener.numCompletedJobs}, only showing ${completedJobs.size}"
  22. }
  23. val summary: NodeSeq =
  24. <div>
  25. <ul class="unstyled">
  26. <li>
  27. <strong>Total Uptime:</strong> // 显示当前Spark应用运行时间
  28. {// 如果还没有结束,就用系统当前时间减开始时间。如果已经结束,就用结束时间减开始时间
  29. if (endTime < 0 && parent.sc.isDefined) {
  30. UIUtils.formatDuration(System.currentTimeMillis() - startTime)
  31. } else if (endTime > 0) {
  32. UIUtils.formatDuration(endTime - startTime)
  33. }
  34. }
  35. </li>
  36. <li>
  37. <strong>Scheduling Mode: </strong> // 显示调度模式,FIFO或FAIR
  38. {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
  39. </li>
  40. {
  41. if (shouldShowActiveJobs) { // 如果有active状态的job,则显示Active Jobs有多少个
  42. <li>
  43. <a href="#active"><strong>Active Jobs:</strong></a>
  44. {activeJobs.size}
  45. </li>
  46. }
  47. }
  48. {
  49. if (shouldShowCompletedJobs) { // 如果有完成状态的job,则显示Completed Jobs的个数
  50. <li id="completed-summary">
  51. <a href="#completed"><strong>Completed Jobs:</strong></a>
  52. {completedJobNumStr}
  53. </li>
  54. }
  55. }
  56. {
  57. if (shouldShowFailedJobs) { // 如果有失败状态的job,则显示Failed Jobs的个数
  58. <li>
  59. <a href="#failed"><strong>Failed Jobs:</strong></a>
  60. {listener.numFailedJobs}
  61. </li>
  62. }
  63. }
  64. </ul>
  65. </div>
  66. var content = summary // 将上面的html代码写入content变量,在最后统一显示content中的内容
  67. val executorListener = parent.executorListener // 这里获取EventTimeline中的信息
  68. content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
  69. executorListener.executorIdToData, startTime)
  70. // 然后根据当前application中是否存在active, failed, completed状态的job,将这些信息显示在页面上。
  71. if (shouldShowActiveJobs) {
  72. content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
  73. activeJobsTable // 生成active状态job的展示表格,具体形式可参看第二部分。按提交时间倒序排列
  74. }
  75. if (shouldShowCompletedJobs) {
  76. content ++= <h4 id="completed">Completed Jobs ({completedJobNumStr})</h4> ++
  77. completedJobsTable
  78. }
  79. if (shouldShowFailedJobs) {
  80. content ++= <h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++
  81. failedJobsTable
  82. }
  83. val helpText = """A job is triggered by an action, like count() or saveAsTextFile().""" +
  84. " Click on a job to see information about the stages of tasks inside it."
  85. UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText)) // 最后将content中的所有内容全部展示在页面上
  86. }
  87. }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94

  接下来以activeJobsTable代码为例分析Jobs信息展示表格的生成。这里主要的方法是makeRow,接收的是上面代码中的activeJobs, completedJobs, failedJobs。这三个对象都是包含在JobProgressListener对象中的,在JobProgressListener中的定义如下:

  1. // 这三个对象用于存储数据的主要是JobUIData类型,
  2. val activeJobs = new HashMap[JobId, JobUIData]
  3. val completedJobs = ListBuffer[JobUIData]()
  4. val failedJobs = ListBuffer[JobUIData]()
  • 1
  • 2
  • 3
  • 4

  将上面三个对象传入到下面这段代码中,继续执行。

  1. private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
  2. val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
  3. val columns: Seq[Node] = { // 显示的信息包括,Job Id(Job Group)以及Job描述,Job提交时间,Job运行时间,总的Stage/Task数,成功的Stage/Task数,以及一个进度条
  4. <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
  5. <th>Description</th>
  6. <th>Submitted</th>
  7. <th>Duration</th>
  8. <th class="sorttable_nosort">Stages: Succeeded/Total</th>
  9. <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
  10. }
  11. def makeRow(job: JobUIData): Seq[Node] = {
  12. val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job)
  13. val duration: Option[Long] = {
  14. job.submissionTime.map { start => // Job运行时长为系统时间,或者结束时间减去开始时间
  15. val end = job.completionTime.getOrElse(System.currentTimeMillis())
  16. end - start
  17. }
  18. }
  19. val formattedDuration = duration.map(d => // 格式化任务运行时间,显示为a h:b m:c s格式UIUtils.formatDuration(d)).getOrElse("Unknown")
  20. val formattedSubmissionTime = // 获取Job提交时间job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
  21. val jobDescription = UIUtils.makeDescription(lastStageDescription, parent.basePath) // 获取任务描述
  22. val detailUrl = // 点击单个Job下面链接跳转到JobPage页面,传入参数为jobId
  23. "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
  24. <tr id={"job-" + job.jobId}>
  25. <td sorttable_customkey={job.jobId.toString}>
  26. {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
  27. </td>
  28. <td>
  29. {jobDescription}
  30. <a href={detailUrl} class="name-link">{lastStageName}</a>
  31. </td>
  32. <td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
  33. {formattedSubmissionTime}
  34. </td>
  35. <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
  36. <td class="stage-progress-cell">
  37. {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
  38. {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
  39. {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
  40. </td>
  41. <td class="progress-cell"> // 进度条
  42. {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
  43. failed = job.numFailedTasks, skipped = job.numSkippedTasks,
  44. total = job.numTasks - job.numSkippedTasks)}
  45. </td>
  46. </tr>
  47. }
  48. <table class="table table-bordered table-striped table-condensed sortable">
  49. <thead>{columns}</thead> // 显示列名
  50. <tbody>
  51. {jobs.map(makeRow)} // 调用上面的row生成方法,具体显示Job信息
  52. </tbody>
  53. </table>
  54. }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

  从上面这些代码中可以看到,Job页面显示的所有数据,都是从JobProgressListener对象中获得的。SparkUI可以理解成一个JobProgressListener对象的消费者,页面上显示的内容都是JobProgressListener内在的展现。
  在接下来一篇文章 Spark-1.6.0之Application运行信息记录器JobProgressListener中会分析运行状态数据是如何写入JobProgressListener中的。
  

二、Spark UI界面实例

  默认情况下,当一个Spark Application运行起来后,可以通过访问hostname:4040端口来访问UI界面。hostname是提交任务的Spark客户端ip地址,端口号由参数spark.ui.port(默认值4040,如果被占用则顺序往后探查)来确定。由于启动一个Application就会生成一个对应的UI界面,所以如果启动时默认的4040端口号被占用,则尝试4041端口,如果还是被占用则尝试4042,一直找到一个可用端口号为止。
  下面启动一个Spark ThriftServer服务,并用beeline命令连接该服务,提交sql语句运行。则ThriftServer对应一个Application,每个sql语句对应一个Job,按照Job的逻辑划分Stage和Task。

1、Jobs页面

这里写图片描述
  连接上该端口后,显示的就是上面的页面,也是Job的主页面。这里会显示所有Active,Completed, Cancled以及Failed状态的Job。默认情况下总共显示1000条Job信息,这个数值由参数spark.ui.retainedJobs(默认值1000)来确定。
  从上面还看到,除了Jobs选项卡之外,还可显示Stages, Storage, Enviroment, Executors, SQL以及JDBC/ODBC Server选项卡。分别如下图所示。

2、Stages页面

这里写图片描述

3、Storage页面

这里写图片描述

4、Enviroment页面

这里写图片描述

5、Executors页面

这里写图片描述

6、单个Job包含的Stages页面

这里写图片描述

7、Task页面

这里写图片描述
  

转载于:https://www.cnblogs.com/wuyida/p/6300237.html

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/990459
推荐阅读
相关标签
  

闽ICP备14008679号