当前位置:   article > 正文

Spark core源码分析之spark集群的启动(一)_start-slaves.sh

start-slaves.sh

1 前言

本文分析的是spark 1.3.1版本的源代码,因为1.3.1是比较经典的版本,其中的rpc是使用akka实现的,而1.6.x的版本的rpc的实现既有akka又有netty,2.0之后的版本就去掉akka只有netty了。现在使用较多的还是1.6.x,所以这里分析1.3.1版本。

导入项目
解压项目包(spark-1.3.1.zip),Idea - import project,选择 core,next 选择 maven,一直next,可以重新写一个project名,导入成功后,等待maven添加依赖

2 分析

通过代码分析spark集群启动的流程

spark集群启动流程
  1. 在主节点调用sbin/start-all.sh脚本
  2. 启动Master,Master启动后会启动一个定时器,用于定时检测超时的Worker
  3. 启动Worker,Worker启动后首先向Master发送注册信息
  4. Master收到Worker的注册信息后,保存到内存和磁盘,并发送masterurl给Worker
  5. Worker收到Master发送的masterurl信息后,启动一个定时器,用于定时向Master发送心跳
  6. Master收到Worker的心跳,找到相应的workinfo,把最后一次心跳时间更改为当前时间
运行脚本分析
  1. 在主节点启动脚本sbin/start-all.sh ->
    sbin/spark-config.sh sbin/start-master.sh sbin/start-slaves.sh
  2. 启动脚本sbin/start-master.sh -> sbin/spark-daemon.sh start
    org.apache.spark.deploy.master.Master
  3. 启动脚本sbin/spark-daemon.sh -> run_command class
    org.apache.spark.deploy.master.Master -> /bin/spark-class start
    org.apache.spark.deploy.master.Master
  4. 启动脚本/bin/spark-class -> org.apache.spark.launcher.Main
    org.apache.spark.deploy.master.Master
  5. 启动脚本start-slaves.sh -> /sbin/slaves.sh start-slave.sh
    spark://hadoop01:7077 启动脚本start-slave.sh -> spark-daemon.sh
  6. org.apache.spark.deploy.worker.Worker 启动脚本spark-daemon.sh ->
    spark-class org.apache.spark.launcher.Main
    org.apache.spark.deploy.worker.Worker

注:第3步中run_command是一个方法
第5步中,通过slaves.sh中的ssh远程登录,实现在各个worker节点调用start-slave.sh脚本
启动Worker进程

2.1 Master的启动

org.apache.spark.deploy.master.Master

Master的启动流程:
1. 从伴生对象的main方法进入
首先是配置启动Master的参数,参数被封装到args(val args = MasterArgument(argStrings,conf))对象中,接下来就是调用startSystemAndActor()方法开启Master,并返回一个actorSystem实例

private[spark] object Master extends Logging {
  val systemName = "sparkMaster"
  private val actorName = "Master"

  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
//    参数准备的过程,准备了一些启动master的参数
    val args = new MasterArguments(argStrings, conf)
//    Start the Master and return a four tuple of
//    创建actorSystem实例并启动一个Actor
    val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    actorSystem.awaitTermination()
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2.进入startSystemAndActor()方法
在该方法中首先是调用AkkaUtils.createActorSystem()方法传入systemName, host, port, conf等参数得到一个actorSystem对象,调用actorSystem.actorOf()方法创建并开启了一个属于Master的actor

注:创建Master actor实例时执行了Master类的主构造器中的代码,完成基本的初始化操作;
当执行actor.start()方法后就开始执行生命周期方法

 /**
   * Start the Master and return a four tuple of:
   *   (1) The Master actor system
   *   (2) The bound port
   *   (3) The web UI bound port
   *   (4) The REST server bound port, if any
   */
  def startSystemAndActor(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
//    通过调用AkkaUtils工具类创建了actorSystem实例
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
      securityManager = securityMgr)
//    创建了属于Master的actor,利用了反射classOf[Master]
//    actorOf(ctrl + alt + 左键 选择ActorSystemImpl)
//    内部调用了guardian.underlying.attachChild(props, name, systemService = false)方法,
//    这个方法或返回一个ActorRef,并且在返回之前会调用start()方法
//    创建一个Master actor实例时会执行Master的主构造器中的代码,调用start()方法之后就开始执行生命周期方法
//    首先执行preStart(),并且只执行一次
//    receive()[Master中是receiveWithLogging()]方法进入等待状态
    val actor = actorSystem.actorOf(
      Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

    val timeout = AkkaUtils.askTimeout(conf)
    val portsRequest = actor.ask(BoundPortsRequest)(timeout)
    val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
    (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

3.查看actorOf()如何创建并开启actor
进入actorOf()方法(ctrl + alt + 左键 选择ActorSystemImpl)

def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false)
  • 1

进入attachChild()方法

private[akka] def attachChild(props: Props, name: String, systemService: Boolean): ActorRef =
  makeChild(this, props, checkName(name), async = true, systemService = systemService)
  • 1
  • 2

在当前文件中搜索makeChild,找到对应的方法实现,该方法返回ActorRef实例actor,并在返回之前调用 actor.start()

private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
  if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope)
    try {
      val ser = SerializationExtension(cell.system)
      props.args forall (arg ⇒
        arg.isInstanceOf[NoSerializationVerificationNeeded] ||
          ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null)
    } catch {
      case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)
    }
  /*
   * in case we are currently terminating, fail external attachChild requests
   * (internal calls cannot happen anyway because we are suspended)
   */
  if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
  else {
    reserveChild(name)
    // this name will either be unreserved or overwritten with a real child below
    val actor =
      try {
        val childPath = new ChildActorPath(cell.self.path, name, ActorCell.newUid())
        cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath,
          systemService = systemService, deploy = None, lookupDeploy = true, async = async)
      } catch {
        case e: InterruptedException ⇒
          unreserveChild(name)
          Thread.interrupted() // clear interrupted flag before throwing according to java convention
          throw e
        case NonFatal(e) ⇒
          unreserveChild(name)
          throw e
      }
    // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
    if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend()
    initChild(actor)
    actor.start()
    actor
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

4.生命周期方法
首先执行preStart()方法,且值执行一次,在该方法中跟集群启动最相关的操作是启动定时器定时检查超时的worker,实现方法见代码注释

 // 继承自Actor,当actor的start()方法被执行是时,首先会执行preStart方法,且只执行一次
  override def preStart() {
    logInfo("Starting Spark master at " + masterUrl)
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    // Listen for remote client disconnection events, since they don't go through Akka's watch()
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
    webUi.bind()
    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
    // 启动定时器检查超时的worker
    // TODO: 这里的接收者为什么是self?
//    这里是定时驱动Master本身去执行消息“CheckForWorkerTimeOut”匹配的功能
//    到receive方法中查看CheckForWorkerTimeOut对应的处理方法
//    主要是执行timeOutDeadWorkers()这个方法,Check for, and remove, any timed-out workers
//    简而言之就是驱动自己去判断并清理超时的worker
    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)

    masterMetricsSystem.registerSource(masterSource)
    masterMetricsSystem.start()
    applicationMetricsSystem.start()
    // Attach the master and app metrics servlet handler to the web ui after the metrics systems are
    // started.
    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
          .newInstance(conf, SerializationExtension(context.system))
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }
    persistenceEngine = persistenceEngine_
    leaderElectionAgent = leaderElectionAgent_
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

然后执行receiveWithLogging生命周期方法,多次执行接收发送过来的消息,相当于一直处于监听状态,该方法中与集群启动相关的模式匹配有RegisterWorker、Heartbeat和CheckForWorkerTimeOut,这里先介绍CheckForWorkerTimeOut模式匹配对应的功能,代码如下:

//      在preStart方法中启动定时器,发送消息给自己,是为了驱动执行timeOutDeadWorkers()方法
    case CheckForWorkerTimeOut => {
      timeOutDeadWorkers()
    }
  • 1
  • 2
  • 3
  • 4

timeOutDeadWorkers()方法,只要用来检查并且清除超时的worker, removeWorker(worker)会将内存和磁盘上保存的worker信息都清除掉

/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
  // Copy the workers into an array so we don't modify the hashset while iterating through it
  val currentTime = System.currentTimeMillis()
  // 把长时间没有建立心跳的worker过滤出来
  val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
  // 调用for循环把超时的worker移除掉(移除的是内存和磁盘的workinfo)
  for (worker <- toRemove) {
    if (worker.state != WorkerState.DEAD) {
      logWarning("Removing %s because we got no heartbeat in %d seconds".format(
        worker.id, WORKER_TIMEOUT/1000))
      removeWorker(worker)
    } else {
      if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {
        workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
      }
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

注:worker的启动源码分析见下一篇

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/72163
推荐阅读
相关标签
  

闽ICP备14008679号