当前位置:   article > 正文

Spark Core源码精读计划#17:上下文清理器ContextCleaner

context 上下文缓存清理

目录

前言

话休絮烦,本文讲解SparkContext初始化的最后一个组件——ContextCleaner,即上下文清理器。顾名思义,它扮演着Spark Core中垃圾收集器的角色,因此虽然我们在平时编码时甚少见到它,但它算是一个幕后英雄了。如果看官对Java GC的相关知识有所了解的话,本篇讲的内容应该容易理解。

初始化与类定义

SparkContext中的初始化逻辑

代码#17.1 - SparkContext构造方法中初始化ContextCleaner

  1. _cleaner =
  2. if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
  3. Some(new ContextCleaner(this))
  4. } else {
  5. None
  6. }
  7. _cleaner.foreach(_.start())

ContextCleaner的初始化非常简单,只需要依赖于SparkContext本身,由spark.cleaner.referenceTracking配置项控制是否启用,默认为true。

ContextCleaner类的属性成员

代码#17.2 - ContextCleaner类的属性成员

  1. private val referenceBuffer =
  2. Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
  3. private val referenceQueue = new ReferenceQueue[AnyRef]
  4. private val listeners = new ConcurrentLinkedQueue[CleanerListener]()
  5. private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
  6. private val periodicGCService: ScheduledExecutorService =
  7. ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
  8. private val periodicGCInterval =
  9. sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
  10. private val blockOnCleanupTasks = sc.conf.getBoolean(
  11. "spark.cleaner.referenceTracking.blocking", true)
  12. private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
  13. "spark.cleaner.referenceTracking.blocking.shuffle", false)
  14. @volatile private var stopped = false
  15. private def blockManagerMaster = sc.env.blockManager.master
  16. private def broadcastManager = sc.env.broadcastManager
  17. private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
  • referenceBuffer:缓存CleanupTaskWeakReference的集合。CleanupTaskWeakReference是Java自带WeakReference类的简单封装,其中保存有需要清理的Spark组件实例的弱引用。
  • referenceQueue:缓存弱引用实例的引用队列(java.lang.ref.ReferenceQueue类型)。对弱引用和软引用实例,当其被GC之后就会存入引用队列中,用户程序通过从队列中取得这些引用信息,就可以执行自定义的清理操作。
  • listeners:ContextCleaner的监听器队列,目前只是在测试代码中用到,没有实际用途。
  • cleaningThread:执行具体清理工作的线程,具体是调用了keepCleaning()方法。后面会讲到该方法的实现。
  • periodicGCService:一个单线程的调度线程池,用来周期性地执行GC操作。
  • periodicGCInterval:periodicGCService执行GC的周期长度,由配置项spark.cleaner.periodicGC.interval控制,默认为30分钟。
  • blockOnCleanupTasks:执行清理任务的时候是否阻塞(不包含Shuffle数据的清理任务),由配置项spark.cleaner.referenceTracking.blocking控制,默认值true。
  • blockOnShuffleCleanupTasks:执行清理Shuffle数据的任务时是否阻塞,由配置项spark.cleaner.referenceTracking.blocking.shuffle控制,默认值false。
  • stopped:该ContextCleaner是否停止的标记。

剩余的三个则分别是从SparkEnv中获取的BlockManagerMaster、BroadcastManager与MapOutputTrackerMaster的对应实例,它们会在之后的清理步骤中用到。

清理任务及弱引用的封装

ContextCleaner中共有5种清理任务,分别对应RDD、Shuffle、广播变量、累加器和检查点,都继承自CleanupTask这个空的特征。它们的定义极其简单,如下。

代码#17.3 - o.a.s.CleanupTask及其子类

  1. private sealed trait CleanupTask
  2. private case class CleanRDD(rddId: Int) extends CleanupTask
  3. private case class CleanShuffle(shuffleId: Int) extends CleanupTask
  4. private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
  5. private case class CleanAccum(accId: Long) extends CleanupTask
  6. private case class CleanCheckpoint(rddId: Int) extends CleanupTask

在上一节讲到的CleanupTaskWeakReference定义如下。当其中的referent对象可达性变为弱可达(weakly reachable)时,对应的CleanupTaskWeakReference实例就会被加入ReferenceQueue中,用于执行清理任务。

代码#17.4 - o.a.s.CleanupTaskWeakReference类

  1. private class CleanupTaskWeakReference(
  2. val task: CleanupTask,
  3. referent: AnyRef,
  4. referenceQueue: ReferenceQueue[AnyRef])
  5. extends WeakReference(referent, referenceQueue)

ContextCleaner的执行流程

启动

在代码#17.1中已经调用了ContextCleaner.start()方法。该方法将清理线程cleaningThread设为守护线程并启动之,然后按照periodicGCInterval的间隔来调度执行System.gc()方法,进而可能触发一次GC。因此,在Spark Application中指定Driver或Executor的JVM参数时,一定不要加上-XX:-DisableExplicitGC,该参数会使System.gc()的调用无效化。

代码#17.5 - o.a.s.ContextCleaner.start()方法

  1. def start(): Unit = {
  2. cleaningThread.setDaemon(true)
  3. cleaningThread.setName("Spark Context Cleaner")
  4. cleaningThread.start()
  5. periodicGCService.scheduleAtFixedRate(new Runnable {
  6. override def run(): Unit = System.gc()
  7. }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  8. }
清理逻辑

ContextCleaner提供了registerForCleanup()方法,用来将CleanupTask及其对应要清理的对象加入referenceBuffer集合中。下面来看代码#17.2中提到的keepCleaning()方法。

代码#17.6 - o.a.s.ContextCleaner.keepCleaning()方法

  1. private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
  2. while (!stopped) {
  3. try {
  4. val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
  5. .map(_.asInstanceOf[CleanupTaskWeakReference])
  6. synchronized {
  7. reference.foreach { ref =>
  8. logDebug("Got cleaning task " + ref.task)
  9. referenceBuffer.remove(ref)
  10. ref.task match {
  11. case CleanRDD(rddId) =>
  12. doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
  13. case CleanShuffle(shuffleId) =>
  14. doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
  15. case CleanBroadcast(broadcastId) =>
  16. doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
  17. case CleanAccum(accId) =>
  18. doCleanupAccum(accId, blocking = blockOnCleanupTasks)
  19. case CleanCheckpoint(rddId) =>
  20. doCleanCheckpoint(rddId)
  21. }
  22. }
  23. }
  24. } catch {
  25. case ie: InterruptedException if stopped => // ignore
  26. case e: Exception => logError("Error in cleaning thread", e)
  27. }
  28. }
  29. }

该方法从ReferenceQueue中取出CleanupTaskWeakReference,然后将其包含的CleanupTask进行模式匹配,并对五种情况分别调用不同的方法。以清理RDD和Shuffle数据的方法为例来看一看。

代码#17.7 - o.a.s.ContextCleaner.doCleanupRDD()/doCleanupShuffle()方法

  1. def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
  2. try {
  3. logDebug("Cleaning RDD " + rddId)
  4. sc.unpersistRDD(rddId, blocking)
  5. listeners.asScala.foreach(_.rddCleaned(rddId))
  6. logInfo("Cleaned RDD " + rddId)
  7. } catch {
  8. case e: Exception => logError("Error cleaning RDD " + rddId, e)
  9. }
  10. }
  11. def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
  12. try {
  13. logDebug("Cleaning shuffle " + shuffleId)
  14. mapOutputTrackerMaster.unregisterShuffle(shuffleId)
  15. blockManagerMaster.removeShuffle(shuffleId, blocking)
  16. listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
  17. logInfo("Cleaned shuffle " + shuffleId)
  18. } catch {
  19. case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
  20. }
  21. }

可见,清理RDD是调用了SparkContext.unpersistRDD()方法来反持久化一个RDD。清理Shuffle则需要同时从MapOutputTracker与BlockManager中反注册Shuffle。清理完毕后再调用各个监听器的监听方法进行记录。

总结

本文简要介绍了ContextCleaner的初始化、启动和清理的具体流程。

在讲完ContextCleaner之后,围绕SparkContext展开的这部分体系也进入了尾声。我们会检查一下前面是否还有漏掉的重要内容,如果没有的话,大概是时候进入Spark Core的核心之一——RDD了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/木道寻08/article/detail/759901
推荐阅读
相关标签
  

闽ICP备14008679号