当前位置:   article > 正文

spark源码学习(二)---Master源码分析(1)-master的主备切换机制_zk 主备切换

zk 主备切换

1.spark master HA 机制概览

hadoop一样,spark也存在单点故障问题,为此,spark的standalone模式提供了master的HA,与hadoop一样,一个是active,一个是standby状态,当active挂了,standby会顶上。spark HA的主备切换主要基于两种机制:基于文件系统和基于zk集群。前者在挂了后需要手动切换,而基于zk的HA可以自动实现切换。切换的过程如下:首先,standby的master去读取持久化(可能是磁盘或者zk)的storedAPP,storeddriver,storedworker的相关信息。读取后,如果storedAPP,storeddriver,storedworker有任何一个非空,那么就会启动master恢复机制,将持久化的APP,driver,worker信息重新注册到内存中,讲application和worker的状态修改为UNknown,然后向该APP对应的driver,worer发送自己的地址信息,driver,worer如果没有挂,那么在接收到master发送的地址后,就会返回响应消息给新的master。此时,master在接收到driver,worker的消息后,会使用completeRecory对没有响应的组件进行清理,最后调用master的schedul方法,对正在等待的driver和app调度,如启动executor。

2.下面源码分析

master位于org.apache.spark.deploy.master下,状态的恢复在beginRecovery & completeRecovery方法中:

beginRecovery 方法首先重新注册APP到内存中,注册代码如下:


  1. private def registerApplication(app: ApplicationInfo): Unit = {
  2. val appAddress = app.driver.address
  3. if (addressToApp.contains(appAddress)) {
  4. logInfo("Attempted to re-register application at same address: " + appAddress)
  5. return
  6. }
  7. applicationMetricsSystem.registerSource(app.appSource)
  8. apps += app
  9. idToApp(app.id) = app
  10. endpointToApp(app.driver) = app
  11. addressToApp(appAddress) = app
  12. waitingApps += app
  13. }


然后将需要恢复的app和driver和worker状态都改为unknown。

  1. private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
  2. storedWorkers: Seq[WorkerInfo]) {
  3. for (app <- storedApps) {
  4. logInfo("Trying to recover app: " + app.id)
  5. try {
  6. <strong><span style="color:#33cc00;">registerApplication(app)</span>
  7. <span style="color:#33cc00;">app.state = ApplicationState.UNKNOWN</span></strong>
  8. app.driver.send(MasterChanged(self, masterWebUiUrl))
  9. } catch {
  10. case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
  11. }
  12. }
  13. for (driver <- storedDrivers) {
  14. // Here we just read in the list of drivers. Any drivers associated with now-lost workers
  15. // will be re-launched when we detect that the worker is missing.
  16. drivers += driver
  17. }
  18. for (worker <- storedWorkers) {
  19. logInfo("Trying to recover worker: " + worker.id)
  20. try {
  21. registerWorker(worker)
  22. worker.state = WorkerState.UNKNOWN
  23. worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
  24. } catch {
  25. case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
  26. }
  27. }
  28. }


得不到响应的driver和APP会被移除:

  1. private def completeRecovery() {
  2. // Ensure "only-once" recovery semantics using a short synchronization period.
  3. if (state != RecoveryState.RECOVERING) { return }
  4. state = RecoveryState.COMPLETING_RECOVERY
  5. // Kill off any workers and apps that didn't respond to us.
  1. //前面已经将新恢复worker和app的状态改成unknown,调用removeWorker和finishApplication移除worker和app
  2. <span style="white-space:pre"> </span>workers.filter(_.state == WorkerState.UNKNOWN).foreach(<strong><span style="color:#009900;">removeWorker</span></strong>)
  3. apps.filter(_.state == ApplicationState.UNKNOWN).foreach(<strong><span style="color:#33cc00;">finishApplication</span></strong>)
  4. // Reschedule drivers which were not claimed by any workers
  5. drivers.filter(_.worker.isEmpty).foreach { d =>
  6. logWarning(s"Driver ${d.id} was not found after master recovery")
  7. if (d.desc.supervise) {
  8. logWarning(s"Re-launching ${d.id}")
  9. <strong>relaunchDriver(d)</strong>
  10. } else {
  11. removeDriver(d.id, DriverState.ERROR, None)
  12. logWarning(s"Did not re-launch ${d.id} because it was not supervised")
  13. }
  14. }
  15. state = RecoveryState.ALIVE
  16. schedule()
  17. logInfo("Recovery complete - resuming operations!")
  18. }


下面查看removerWorker的源码:首先移除executor,再移除driver,再移除worker。

  1. private def removeWorker(worker: WorkerInfo) {
  2. logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
  3. //首先,将work的状态改为dead
  4. worker.setState(WorkerState.DEAD)
  5. //然后,从内存结构中移除该worker,idToWorker是一个记录worker的hashmap
  6. idToWorker -= worker.id
  7. addressToWorker -= worker.endpoint.address
  8. //遍历属于该worker的所有executor,向使用每个executor的driver发送消息,告知该executor挂了,并移除该executor
  9. for (exec <- worker.executors.values) {
  10. logInfo("Telling app of lost executor: " + exec.id)
  11. exec.application.driver.send(ExecutorUpdated(
  12. exec.id, ExecutorState.LOST, Some("worker lost"), None))
  13. exec.application.removeExecutor(exec)
  14. }
  15. //对于driver,如果设置了对driver的监控,那么重启该driver,否则直接移除该driver
  16. for (driver <- worker.drivers.values) {
  17. if (driver.desc.<strong><span style="color:#ff0000;">supervise</span></strong>) {
  18. logInfo(s"Re-launching ${driver.id}")
  19. <span style="color:#33cc00;"><strong>relaunchDriver(driver)</strong></span>
  20. } else {
  21. logInfo(s"Not re-launching ${driver.id} because it was not supervised")
  22. removeDriver(driver.id, DriverState.ERROR, None)
  23. }
  24. }
  25. //最后移除worker
  26. persistenceEngine.removeWorker(worker)
  27. }

重新吊起driver代码:改变driver的状态,重新加入到内存中,加入后,master会重新启动该driver。其中waitingDrivers是一个ArrayBuffer:waitingDrivers = new ArrayBuffer[DriverInfo]


  1. private def relaunchDriver(driver: DriverInfo) {
  2. driver.worker = None
  3. driver.state = DriverState.RELAUNCHING
  4. <strong>waitingDrivers </strong>+= driver
  5. schedule()
  6. }

removedriver:移除driver的代码如下:


  1. private def removeDriver(
  2. driverId: String,
  3. finalState: DriverState,
  4. exception: Option[Exception]) {
  5. //找到该driver
  6. drivers.find(d => d.id == driverId) match {
  7. case Some(driver) =>
  8. logInfo(s"Removing driver: $driverId")
  9. //从内存中移除该driver
  10. drivers -= driver
  11. if (completedDrivers.size >= RETAINED_DRIVERS) {
  12. val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
  13. completedDrivers.trimStart(toRemove)
  14. }
  15. //将该driver加入到已完成的driver中
  16. completedDrivers += driver
  17. //移除该driver的持久化信息
  18. persistenceEngine.removeDriver(driver)
  19. driver.state = finalState
  20. driver.exception = exception
  21. //从该driver所在的worker里移除driver
  22. driver.worker.foreach(w => w.removeDriver(driver))
  23. schedule()
  24. case None =>
  25. logWarning(s"Asked to remove unknown driver: $driverId")
  26. }
  27. }
  28. }

finishApplication实际调用了removeApplication,执行过程类似:

  1. def removeApplication(app: ApplicationInfo, state: ApplicationState.Value) {
  2. if (apps.contains(app)) {
  3. logInfo("Removing app " + app.id)
  4. apps -= app
  5. idToApp -= app.id
  6. endpointToApp -= app.driver
  7. addressToApp -= app.driver.address
  8. if (completedApps.size >= RETAINED_APPLICATIONS) {
  9. val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
  10. completedApps.take(toRemove).foreach( a => {
  11. Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
  12. applicationMetricsSystem.removeSource(a.appSource)
  13. })
  14. completedApps.trimStart(toRemove)
  15. }
  16. completedApps += app // Remember it in our history
  17. waitingApps -= app
  18. // If application events are logged, use them to rebuild the UI
  19. asyncRebuildSparkUI(app)
  20. for (exec <- app.executors.values) {
  21. killExecutor(exec)
  22. }
  23. app.markFinished(state)
  24. if (state != ApplicationState.FINISHED) {
  25. app.driver.send(ApplicationRemoved(state.toString))
  26. }
  27. persistenceEngine.removeApplication(app)
  28. schedule()
  29. // Tell all workers that the application has finished, so they can clean up any app state.
  30. workers.foreach { w =>
  31. w.endpoint.send(ApplicationFinished(app.id))
  32. }
  33. }
  34. }



声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/450434
推荐阅读
相关标签
  

闽ICP备14008679号