赞
踩
1.spark master HA 机制概览
与hadoop一样,spark也存在单点故障问题,为此,spark的standalone模式提供了master的HA,与hadoop一样,一个是active,一个是standby状态,当active挂了,standby会顶上。spark HA的主备切换主要基于两种机制:基于文件系统和基于zk集群。前者在挂了后需要手动切换,而基于zk的HA可以自动实现切换。切换的过程如下:首先,standby的master去读取持久化(可能是磁盘或者zk)的storedAPP,storeddriver,storedworker的相关信息。读取后,如果storedAPP,storeddriver,storedworker有任何一个非空,那么就会启动master恢复机制,将持久化的APP,driver,worker信息重新注册到内存中,讲application和worker的状态修改为UNknown,然后向该APP对应的driver,worer发送自己的地址信息,driver,worer如果没有挂,那么在接收到master发送的地址后,就会返回响应消息给新的master。此时,master在接收到driver,worker的消息后,会使用completeRecory对没有响应的组件进行清理,最后调用master的schedul方法,对正在等待的driver和app调度,如启动executor。
2.下面源码分析
master位于org.apache.spark.deploy.master下,状态的恢复在beginRecovery & completeRecovery方法中:
beginRecovery 方法首先重新注册APP到内存中,注册代码如下:
- private def registerApplication(app: ApplicationInfo): Unit = {
- val appAddress = app.driver.address
- if (addressToApp.contains(appAddress)) {
- logInfo("Attempted to re-register application at same address: " + appAddress)
- return
- }
-
- applicationMetricsSystem.registerSource(app.appSource)
- apps += app
- idToApp(app.id) = app
- endpointToApp(app.driver) = app
- addressToApp(appAddress) = app
- waitingApps += app
- }
然后将需要恢复的app和driver和worker状态都改为unknown。
- private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
- storedWorkers: Seq[WorkerInfo]) {
- for (app <- storedApps) {
- logInfo("Trying to recover app: " + app.id)
- try {
- <strong><span style="color:#33cc00;">registerApplication(app)</span>
- <span style="color:#33cc00;">app.state = ApplicationState.UNKNOWN</span></strong>
- app.driver.send(MasterChanged(self, masterWebUiUrl))
- } catch {
- case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
- }
- }
-
- for (driver <- storedDrivers) {
- // Here we just read in the list of drivers. Any drivers associated with now-lost workers
- // will be re-launched when we detect that the worker is missing.
- drivers += driver
- }
-
- for (worker <- storedWorkers) {
- logInfo("Trying to recover worker: " + worker.id)
- try {
- registerWorker(worker)
- worker.state = WorkerState.UNKNOWN
- worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
- } catch {
- case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
- }
- }
- }
得不到响应的driver和APP会被移除:
- private def completeRecovery() {
- // Ensure "only-once" recovery semantics using a short synchronization period.
- if (state != RecoveryState.RECOVERING) { return }
- state = RecoveryState.COMPLETING_RECOVERY
-
- // Kill off any workers and apps that didn't respond to us.
- //前面已经将新恢复worker和app的状态改成unknown,调用removeWorker和finishApplication移除worker和app
- <span style="white-space:pre"> </span>workers.filter(_.state == WorkerState.UNKNOWN).foreach(<strong><span style="color:#009900;">removeWorker</span></strong>)
- apps.filter(_.state == ApplicationState.UNKNOWN).foreach(<strong><span style="color:#33cc00;">finishApplication</span></strong>)
-
- // Reschedule drivers which were not claimed by any workers
- drivers.filter(_.worker.isEmpty).foreach { d =>
- logWarning(s"Driver ${d.id} was not found after master recovery")
- if (d.desc.supervise) {
- logWarning(s"Re-launching ${d.id}")
- <strong>relaunchDriver(d)</strong>
- } else {
- removeDriver(d.id, DriverState.ERROR, None)
- logWarning(s"Did not re-launch ${d.id} because it was not supervised")
- }
- }
-
- state = RecoveryState.ALIVE
- schedule()
- logInfo("Recovery complete - resuming operations!")
- }
下面查看removerWorker的源码:首先移除executor,再移除driver,再移除worker。
- private def removeWorker(worker: WorkerInfo) {
- logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
- //首先,将work的状态改为dead
- worker.setState(WorkerState.DEAD)
- //然后,从内存结构中移除该worker,idToWorker是一个记录worker的hashmap
- idToWorker -= worker.id
- addressToWorker -= worker.endpoint.address
- //遍历属于该worker的所有executor,向使用每个executor的driver发送消息,告知该executor挂了,并移除该executor
- for (exec <- worker.executors.values) {
- logInfo("Telling app of lost executor: " + exec.id)
- exec.application.driver.send(ExecutorUpdated(
- exec.id, ExecutorState.LOST, Some("worker lost"), None))
- exec.application.removeExecutor(exec)
- }
- //对于driver,如果设置了对driver的监控,那么重启该driver,否则直接移除该driver
- for (driver <- worker.drivers.values) {
- if (driver.desc.<strong><span style="color:#ff0000;">supervise</span></strong>) {
- logInfo(s"Re-launching ${driver.id}")
- <span style="color:#33cc00;"><strong>relaunchDriver(driver)</strong></span>
- } else {
- logInfo(s"Not re-launching ${driver.id} because it was not supervised")
- removeDriver(driver.id, DriverState.ERROR, None)
- }
- }
- //最后移除worker
- persistenceEngine.removeWorker(worker)
- }
- private def relaunchDriver(driver: DriverInfo) {
- driver.worker = None
- driver.state = DriverState.RELAUNCHING
- <strong>waitingDrivers </strong>+= driver
- schedule()
- }
- private def removeDriver(
- driverId: String,
- finalState: DriverState,
- exception: Option[Exception]) {
- //找到该driver
- drivers.find(d => d.id == driverId) match {
- case Some(driver) =>
- logInfo(s"Removing driver: $driverId")
- //从内存中移除该driver
- drivers -= driver
- if (completedDrivers.size >= RETAINED_DRIVERS) {
- val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
- completedDrivers.trimStart(toRemove)
- }
- //将该driver加入到已完成的driver中
- completedDrivers += driver
- //移除该driver的持久化信息
- persistenceEngine.removeDriver(driver)
- driver.state = finalState
- driver.exception = exception
- //从该driver所在的worker里移除driver
- driver.worker.foreach(w => w.removeDriver(driver))
- schedule()
- case None =>
- logWarning(s"Asked to remove unknown driver: $driverId")
- }
- }
- }
- def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
- if (apps.contains(app)) {
- logInfo("Removing app " + app.id)
- apps -= app
- idToApp -= app.id
- endpointToApp -= app.driver
- addressToApp -= app.driver.address
- if (completedApps.size >= RETAINED_APPLICATIONS) {
- val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
- completedApps.take(toRemove).foreach( a => {
- Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
- applicationMetricsSystem.removeSource(a.appSource)
- })
- completedApps.trimStart(toRemove)
- }
- completedApps += app // Remember it in our history
- waitingApps -= app
-
- // If application events are logged, use them to rebuild the UI
- asyncRebuildSparkUI(app)
-
- for (exec <- app.executors.values) {
- killExecutor(exec)
- }
- app.markFinished(state)
- if (state != ApplicationState.FINISHED) {
- app.driver.send(ApplicationRemoved(state.toString))
- }
- persistenceEngine.removeApplication(app)
- schedule()
-
- // Tell all workers that the application has finished, so they can clean up any app state.
- workers.foreach { w =>
- w.endpoint.send(ApplicationFinished(app.id))
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。