赞
踩
本文分析的是spark 1.3.1版本的源代码,因为1.3.1是比较经典的版本,其中的rpc是使用akka实现的,而1.6.x的版本的rpc的实现既有akka又有netty,2.0之后的版本就去掉akka只有netty了。现在使用较多的还是1.6.x,所以这里分析1.3.1版本。
导入项目
解压项目包(spark-1.3.1.zip),Idea - import project,选择 core,next 选择 maven,一直next,可以重新写一个project名,导入成功后,等待maven添加依赖
通过代码分析spark集群启动的流程
注:第3步中run_command是一个方法
第5步中,通过slaves.sh中的ssh远程登录,实现在各个worker节点调用start-slave.sh脚本
启动Worker进程
org.apache.spark.deploy.master.Master
Master的启动流程:
1. 从伴生对象的main方法进入
首先是配置启动Master的参数,参数被封装到args(val args = MasterArgument(argStrings,conf))对象中,接下来就是调用startSystemAndActor()方法开启Master,并返回一个actorSystem实例
private[spark] object Master extends Logging {
val systemName = "sparkMaster"
private val actorName = "Master"
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
// 参数准备的过程,准备了一些启动master的参数
val args = new MasterArguments(argStrings, conf)
// Start the Master and return a four tuple of
// 创建actorSystem实例并启动一个Actor
val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
actorSystem.awaitTermination()
}
2.进入startSystemAndActor()方法
在该方法中首先是调用AkkaUtils.createActorSystem()方法传入systemName, host, port, conf等参数得到一个actorSystem对象,调用actorSystem.actorOf()方法创建并开启了一个属于Master的actor
注:创建Master actor实例时执行了Master类的主构造器中的代码,完成基本的初始化操作;
当执行actor.start()方法后就开始执行生命周期方法
/**
* Start the Master and return a four tuple of:
* (1) The Master actor system
* (2) The bound port
* (3) The web UI bound port
* (4) The REST server bound port, if any
*/
def startSystemAndActor(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
// 通过调用AkkaUtils工具类创建了actorSystem实例
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
securityManager = securityMgr)
// 创建了属于Master的actor,利用了反射classOf[Master]
// actorOf(ctrl + alt + 左键 选择ActorSystemImpl)
// 内部调用了guardian.underlying.attachChild(props, name, systemService = false)方法,
// 这个方法或返回一个ActorRef,并且在返回之前会调用start()方法
// 创建一个Master actor实例时会执行Master的主构造器中的代码,调用start()方法之后就开始执行生命周期方法
// 首先执行preStart(),并且只执行一次
// receive()[Master中是receiveWithLogging()]方法进入等待状态
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
}
3.查看actorOf()如何创建并开启actor
进入actorOf()方法(ctrl + alt + 左键 选择ActorSystemImpl)
def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false)
进入attachChild()方法
private[akka] def attachChild(props: Props, name: String, systemService: Boolean): ActorRef =
makeChild(this, props, checkName(name), async = true, systemService = systemService)
在当前文件中搜索makeChild,找到对应的方法实现,该方法返回ActorRef实例actor,并在返回之前调用 actor.start()
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean, systemService: Boolean): ActorRef = {
if (cell.system.settings.SerializeAllCreators && !systemService && props.deploy.scope != LocalScope)
try {
val ser = SerializationExtension(cell.system)
props.args forall (arg ⇒
arg.isInstanceOf[NoSerializationVerificationNeeded] ||
ser.deserialize(ser.serialize(arg.asInstanceOf[AnyRef]).get, arg.getClass).get != null)
} catch {
case NonFatal(e) ⇒ throw new IllegalArgumentException(s"pre-creation serialization check failed at [${cell.self.path}/$name]", e)
}
/*
* in case we are currently terminating, fail external attachChild requests
* (internal calls cannot happen anyway because we are suspended)
*/
if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
else {
reserveChild(name)
// this name will either be unreserved or overwritten with a real child below
val actor =
try {
val childPath = new ChildActorPath(cell.self.path, name, ActorCell.newUid())
cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath,
systemService = systemService, deploy = None, lookupDeploy = true, async = async)
} catch {
case e: InterruptedException ⇒
unreserveChild(name)
Thread.interrupted() // clear interrupted flag before throwing according to java convention
throw e
case NonFatal(e) ⇒
unreserveChild(name)
throw e
}
// mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend()
initChild(actor)
actor.start()
actor
}
}
4.生命周期方法
首先执行preStart()方法,且值执行一次,在该方法中跟集群启动最相关的操作是启动定时器定时检查超时的worker,实现方法见代码注释
// 继承自Actor,当actor的start()方法被执行是时,首先会执行preStart方法,且只执行一次
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
// 启动定时器检查超时的worker
// TODO: 这里的接收者为什么是self?
// 这里是定时驱动Master本身去执行消息“CheckForWorkerTimeOut”匹配的功能
// 到receive方法中查看CheckForWorkerTimeOut对应的处理方法
// 主要是执行timeOutDeadWorkers()这个方法,Check for, and remove, any timed-out workers
// 简而言之就是驱动自己去判断并清理超时的worker
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system))
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system))
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(conf.getClass, Serialization.getClass)
.newInstance(conf, SerializationExtension(context.system))
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}
然后执行receiveWithLogging生命周期方法,多次执行接收发送过来的消息,相当于一直处于监听状态,该方法中与集群启动相关的模式匹配有RegisterWorker、Heartbeat和CheckForWorkerTimeOut,这里先介绍CheckForWorkerTimeOut模式匹配对应的功能,代码如下:
// 在preStart方法中启动定时器,发送消息给自己,是为了驱动执行timeOutDeadWorkers()方法
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
timeOutDeadWorkers()方法,只要用来检查并且清除超时的worker, removeWorker(worker)会将内存和磁盘上保存的worker信息都清除掉
/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val currentTime = System.currentTimeMillis()
// 把长时间没有建立心跳的worker过滤出来
val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
// 调用for循环把超时的worker移除掉(移除的是内存和磁盘的workinfo)
for (worker <- toRemove) {
if (worker.state != WorkerState.DEAD) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT/1000))
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT)) {
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
}
}
}
注:worker的启动源码分析见下一篇
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。