赞
踩
目录
话休絮烦,本文讲解SparkContext初始化的最后一个组件——ContextCleaner,即上下文清理器。顾名思义,它扮演着Spark Core中垃圾收集器的角色,因此虽然我们在平时编码时甚少见到它,但它算是一个幕后英雄了。如果看官对Java GC的相关知识有所了解的话,本篇讲的内容应该容易理解。
代码#17.1 - SparkContext构造方法中初始化ContextCleaner
- _cleaner =
- if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
- Some(new ContextCleaner(this))
- } else {
- None
- }
- _cleaner.foreach(_.start())
ContextCleaner的初始化非常简单,只需要依赖于SparkContext本身,由spark.cleaner.referenceTracking配置项控制是否启用,默认为true。
代码#17.2 - ContextCleaner类的属性成员
- private val referenceBuffer =
- Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
-
- private val referenceQueue = new ReferenceQueue[AnyRef]
-
- private val listeners = new ConcurrentLinkedQueue[CleanerListener]()
-
- private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
-
- private val periodicGCService: ScheduledExecutorService =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
-
- private val periodicGCInterval =
- sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
-
- private val blockOnCleanupTasks = sc.conf.getBoolean(
- "spark.cleaner.referenceTracking.blocking", true)
-
- private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
- "spark.cleaner.referenceTracking.blocking.shuffle", false)
-
- @volatile private var stopped = false
-
- private def blockManagerMaster = sc.env.blockManager.master
- private def broadcastManager = sc.env.broadcastManager
- private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]

剩余的三个则分别是从SparkEnv中获取的BlockManagerMaster、BroadcastManager与MapOutputTrackerMaster的对应实例,它们会在之后的清理步骤中用到。
ContextCleaner中共有5种清理任务,分别对应RDD、Shuffle、广播变量、累加器和检查点,都继承自CleanupTask这个空的特征。它们的定义极其简单,如下。
代码#17.3 - o.a.s.CleanupTask及其子类
- private sealed trait CleanupTask
- private case class CleanRDD(rddId: Int) extends CleanupTask
- private case class CleanShuffle(shuffleId: Int) extends CleanupTask
- private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
- private case class CleanAccum(accId: Long) extends CleanupTask
- private case class CleanCheckpoint(rddId: Int) extends CleanupTask
在上一节讲到的CleanupTaskWeakReference定义如下。当其中的referent对象可达性变为弱可达(weakly reachable)时,对应的CleanupTaskWeakReference实例就会被加入ReferenceQueue中,用于执行清理任务。
代码#17.4 - o.a.s.CleanupTaskWeakReference类
- private class CleanupTaskWeakReference(
- val task: CleanupTask,
- referent: AnyRef,
- referenceQueue: ReferenceQueue[AnyRef])
- extends WeakReference(referent, referenceQueue)
在代码#17.1中已经调用了ContextCleaner.start()方法。该方法将清理线程cleaningThread设为守护线程并启动之,然后按照periodicGCInterval的间隔来调度执行System.gc()方法,进而可能触发一次GC。因此,在Spark Application中指定Driver或Executor的JVM参数时,一定不要加上-XX:-DisableExplicitGC
,该参数会使System.gc()的调用无效化。
代码#17.5 - o.a.s.ContextCleaner.start()方法
- def start(): Unit = {
- cleaningThread.setDaemon(true)
- cleaningThread.setName("Spark Context Cleaner")
- cleaningThread.start()
- periodicGCService.scheduleAtFixedRate(new Runnable {
- override def run(): Unit = System.gc()
- }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
- }
ContextCleaner提供了registerForCleanup()方法,用来将CleanupTask及其对应要清理的对象加入referenceBuffer集合中。下面来看代码#17.2中提到的keepCleaning()方法。
代码#17.6 - o.a.s.ContextCleaner.keepCleaning()方法
- private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
- while (!stopped) {
- try {
- val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
- .map(_.asInstanceOf[CleanupTaskWeakReference])
- synchronized {
- reference.foreach { ref =>
- logDebug("Got cleaning task " + ref.task)
- referenceBuffer.remove(ref)
- ref.task match {
- case CleanRDD(rddId) =>
- doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
- case CleanShuffle(shuffleId) =>
- doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
- case CleanBroadcast(broadcastId) =>
- doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
- case CleanAccum(accId) =>
- doCleanupAccum(accId, blocking = blockOnCleanupTasks)
- case CleanCheckpoint(rddId) =>
- doCleanCheckpoint(rddId)
- }
- }
- }
- } catch {
- case ie: InterruptedException if stopped => // ignore
- case e: Exception => logError("Error in cleaning thread", e)
- }
- }
- }

该方法从ReferenceQueue中取出CleanupTaskWeakReference,然后将其包含的CleanupTask进行模式匹配,并对五种情况分别调用不同的方法。以清理RDD和Shuffle数据的方法为例来看一看。
代码#17.7 - o.a.s.ContextCleaner.doCleanupRDD()/doCleanupShuffle()方法
- def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
- try {
- logDebug("Cleaning RDD " + rddId)
- sc.unpersistRDD(rddId, blocking)
- listeners.asScala.foreach(_.rddCleaned(rddId))
- logInfo("Cleaned RDD " + rddId)
- } catch {
- case e: Exception => logError("Error cleaning RDD " + rddId, e)
- }
- }
-
- def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
- try {
- logDebug("Cleaning shuffle " + shuffleId)
- mapOutputTrackerMaster.unregisterShuffle(shuffleId)
- blockManagerMaster.removeShuffle(shuffleId, blocking)
- listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
- logInfo("Cleaned shuffle " + shuffleId)
- } catch {
- case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
- }
- }

可见,清理RDD是调用了SparkContext.unpersistRDD()方法来反持久化一个RDD。清理Shuffle则需要同时从MapOutputTracker与BlockManager中反注册Shuffle。清理完毕后再调用各个监听器的监听方法进行记录。
本文简要介绍了ContextCleaner的初始化、启动和清理的具体流程。
在讲完ContextCleaner之后,围绕SparkContext展开的这部分体系也进入了尾声。我们会检查一下前面是否还有漏掉的重要内容,如果没有的话,大概是时候进入Spark Core的核心之一——RDD了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。