赞
踩
参数名及默认值 | 含义 |
---|---|
spark.dynamicAllocation.enabled = false | 是否开启动态资源分配,主要是基于集群负载分配executor |
spark.dynamicAllocation.executorIdleTimeout=60s | executor空闲时间达到规定值,则将该executor移除。 |
spark.dynamicAllocation.maxExecutors=infinity | 最多使用的executor数,默认为你申请的最大executor数 |
spark.dynamicAllocation.minExecutors=0 | 最少保留的executor数 |
spark.dynamicAllocation.schedulerBacklogTimeout=1s | 有task等待运行时间超过该值后开始启动executor |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout=schedulerBacklogTimeout | 动态启动executor的间隔 |
参数名及默认值 | 含义 |
---|---|
spark.dynamicAllocation.initialExecutors=spark.dynamicAllocation.minExecutors | 启动的初始executor数 |
参数名及默认值 | 含义 |
---|---|
spark.dynamicAllocation.cachedExecutorIdleTimeout=infinity | 缓存了数据的executor如果超过该值仍然空闲 会被移除 |
参数名及默认值 | 含义 |
---|---|
spark.dynamicAllocation.executorAllocationRatio=1 | 默认动态分配会请求很多executor以实现最大并行度,但对于小任务而言,该策略反而会造成资源浪费。该值会受最大最小executor数的影响 |
参数名及默认值 | 含义 |
---|---|
spark.dynamicAllocation.shuffleTracking.enabled=false | 为executor开启shuffle文件跟踪(即存储shuffle数据),避免动态分配依赖外部shuffle服务。 |
spark.dynamicAllocation.shuffleTracking.timeout=infinity | 当shuffle跟踪开启后,控制executor存储shuffle数据的超时时间, |
最开始生效位置 : org.apache.spark.SparkContext#_executorAllocationManager
// 动态分配参数必须 在非local环境下才能生效, val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => // 动态分配资源交给 动态分配管理器对象来 实现 Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, cleaner = cleaner, resourceProfileManager = resourceProfileManager)) case _ => None } } else { None } // 调用 ExecutorAllocationManager 的start方法 _executorAllocationManager.foreach(_.start())
def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
// DYN_ALLOCATION_ENABLED 对应 spark.dynamicAllocation.enabled参数
val dynamicAllocationEnabled = conf.get(DYN_ALLOCATION_ENABLED)
dynamicAllocationEnabled &&
(!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING))
}
// 运行模式必须非本地,才能使用动态资源分配
def isLocalMaster(conf: SparkConf): Boolean = {
val master = conf.get("spark.master", "")
master == "local" || master.startsWith("local[")
}
动态资源分配的工作,全部交由ExecutorAllocationManager类来管理,可以根据集群负载 实现最大并行化运行程序。
在sparkcontext初始化时,被调用。
注意看,这里用到了
spark.dynamicAllocation.minExecutors
(默认为0),spark.dynamicAllocation.initialExecutors
(默认等于minexecutor),spark.executor.instances
(默认为0) 3个参数,取其中最大值作为初始化 分配的 executor数。
org.apache.spark.ExecutorAllocationManager#start
/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */ def start(): Unit = { listenerBus.addToManagementQueue(listener) listenerBus.addToManagementQueue(executorMonitor) cleaner.foreach(_.attachListener(executorMonitor)) val scheduleTask = new Runnable() { override def run(): Unit = { try { schedule() } catch { case ct: ControlThrowable => throw ct case t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } } // 定时任务, 请求executor 或者 回收过期executor // intervalMillis 默认100,单位ms executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) // 请求初始数量executor,numExecutorsTarget一开始被初始化这3个参数的最大值 max(spark.dynamicAllocation.minExecutors,spark.dynamicAllocation.initialExecutors,spark.executor.instances) client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) }
org.apache.spark.ExecutorAllocationManager#schedule
/** * This is called at a fixed interval to regulate the number of pending executor requests * and number of executors running. * * First, adjust our requested executors based on the add time and our current needs. * Then, if the remove time for an existing executor has expired, kill the executor. * * This is factored out into its own method for testing. */ private def schedule(): Unit = synchronized { val executorIdsToBeRemoved = executorMonitor.timedOutExecutors() if (executorIdsToBeRemoved.nonEmpty) { initializing = false } // 请求的当前实际所需executor updateAndSyncNumExecutorsTarget(clock.nanoTime()) // 移除过期的executor if (executorIdsToBeRemoved.nonEmpty) { removeExecutors(executorIdsToBeRemoved) } }
总体调用示意图如下:
schedule是一个定时任务,每隔100ms运行一次
这里我们先看 updateAndSyncNumExecutorsTarget 和removeExecutors方法,因为其内部 最终也会调用 requestTotalExecutors
注意看,这里用到了一个新参数 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 默认为 spark.dynamicAllocation.schedulerBacklogTimeout 参数,默认为1s
org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized { // 我们需要的最大executor val maxNeeded = maxNumExecutorsNeeded if (initializing) { // 当前仍在初始化 0 } else if (maxNeeded < numExecutorsTarget) { // numExecutorsTarget表示已经分配的,超过了最大所需要maxNeeded,因此需要回收executor val oldNumExecutorsTarget = numExecutorsTarget // minNumExecutors对应spark.dynamicAllocation.minExecutors参数,默认为0 numExecutorsTarget = math.max(maxNeeded, minNumExecutors) numExecutorsToAdd = 1 // 实际需要的executor数 小于 当前的executor数 if (numExecutorsTarget < oldNumExecutorsTarget) { // 异步请求去释放空闲executor资源 client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " + s"$oldNumExecutorsTarget) because not all requested executors are actually needed") } // 返回释放executor数量,负数表示移除executor numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) { // 如果 最大请求executor数超过了当前已分配的executor数,且超过了间隔时间 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 默认为 spark.dynamicAllocation.schedulerBacklogTimeout 参数,默认为1s val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") addTime = now + TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS) delta } else { 0 } }
org.apache.spark.ExecutorAllocationManager#maxNumExecutorsNeeded
注意看,这里又出现了1个新参数,spark.dynamicAllocation.executorAllocationRatio参数,默认1.0
private def maxNumExecutorsNeeded(): Int = { // totalPendingTasks包括等待的任务+ 等待的推测执行任务 val numRunningOrPendingTasks= listener.totalPendingTasks + listener.totalRunningTasks // executorAllocationRatio 即 spark.dynamicAllocation.executorAllocationRatio参数,默认1.0 // tasksPerExecutorForFullParallelism参数计算如下 // 向上取整结果 val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio / tasksPerExecutorForFullParallelism).toInt if (tasksPerExecutorForFullParallelism > 1 && maxNeeded == 1 && listener.pendingSpeculativeTasks > 0) { // 如果最大需要executor为1个,且推测执行还有等待任务,则多分配1个 maxNeeded + 1 } else { maxNeeded } } // EXECUTOR_CORES 对应 spark.executor.cores,表示每个executor的cpu数,默认为1 //CPUS_PER_TASK对应spark.task.cpus,表示每个task所消耗cpu数,默认为1; private val tasksPerExecutorForFullParallelism = conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK) // 向资源管理器请求一定数量的executor // 如果请求的executor数量到达最大executor数,那就放弃请求,重置为0;否则翻倍去请求资源 private def addExecutors(maxNumExecutorsNeeded: Int): Int = { // maxNumExecutors 对应 spark.dynamicAllocation.maxExecutors if (numExecutorsTarget >= maxNumExecutors) { logDebug(s"Not adding executors because our current target total " + s"is already $numExecutorsTarget (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } val oldNumExecutorsTarget = numExecutorsTarget // There's no point in wasting time ramping up to the number of executors we already have, so // make sure our target is at least as much as our current allocation: numExecutorsTarget = math.max(numExecutorsTarget, executorMonitor.executorCount) // Boost our target with the number to add for this round: numExecutorsTarget += numExecutorsToAdd // Ensure that our target doesn't exceed what we need at the present moment: numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded) // Ensure that our target fits within configured bounds: numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) // 重新计算得出当前要请求的executor数 val delta = numExecutorsTarget - oldNumExecutorsTarget // If our target has not changed, do not send a message // to the cluster manager and reset our exponential growth if (delta == 0) { numExecutorsToAdd = 1 return 0 } val addRequestAcknowledged = try { testing || // 和回收executor资源一样,请求executor资源 也是这个api client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) } catch { case NonFatal(e) => // Use INFO level so the error it doesn't show up by default in shells. Errors here are more // commonly caused by YARN AM restarts, which is a recoverable issue, and generate a lot of // noisy output. logInfo("Error reaching cluster manager.", e) false } if (addRequestAcknowledged) { val executorsString = "executor" + { if (delta > 1) "s" else "" } logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" + s" (new desired total will be $numExecutorsTarget)") numExecutorsToAdd = if (delta == numExecutorsToAdd) { numExecutorsToAdd * 2 } else { 1 } delta } else { logWarning( s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!") numExecutorsTarget = oldNumExecutorsTarget 0 } }
计算当前最大需要的executor:
pendingTasks方法 + pendingSpeculativeTasks方法 + totalRunningTasks方法
变量stageAttemptToNumTasks–》pendingTasks (基于stageAttemptToTaskIndices(表示已分配)相减,得出剩余待运行任务)
变量stageAttemptToNumRunningTask(已运行任务:包括推测任务)–》totalRunningTasks
变量stageAttemptToNumSpeculativeTasks(推测任务:包括等待和已运行的)–》pendingSpeculativeTasks (基于stageAttemptToSpeculativeTaskIndices(表示已运行的推测任务),相减得出剩余待运行的推测任务)
updateAndSyncNumExecutorsTarget 逻辑流程示意图如下:
todo: 为什么 新增的executor 等于 numExecutorsToAdd,下次分配的executor即 numExecutorsToAdd 翻倍?
CoarseGrainedSchedulerBackend 实现了ExecutorAllocationClient接口的requestTotalExecutors方法。
经过断点调试,追踪 yarn client 模式下,requestTotalExecutors方法调用路径如下:
org.apache.spark.ExecutorAllocationClient#requestTotalExecutors
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestTotalExecutors
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#doRequestTotalExecutors
org.apache.spark.scheduler.cluster.YarnSchedulerBackend#doRequestTotalExecutors
org.apache.spark.scheduler.cluster.YarnSchedulerBackend#prepareRequestExecutors
private[cluster] def prepareRequestExecutors(requestedTotal: Int): RequestExecutors = {
val nodeBlacklist: Set[String] = scheduler.nodeBlacklist()
// For locality preferences, ignore preferences for nodes that are blacklisted
val filteredHostToLocalTaskCount =
hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklist.contains(k) }
// driver端 发送 RequestExecutors 消息
RequestExecutors(requestedTotal, localityAwareTasks, filteredHostToLocalTaskCount,
nodeBlacklist)
}
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors
// Request executors by specifying the new total number of executors desired
// This includes executors already pending or running
case class RequestExecutors(
requestedTotal: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int],
nodeBlacklist: Set[String])
extends CoarseGrainedClusterMessage
todo: 动态资源分配和 普通分配(静态分配)的区别在哪??
动态分配可以基于当前集群负载最大化并行运行任务,避免静态分配资源分配不合理,造成资源浪费。
org.apache.spark.deploy.yarn.ApplicationMaster.AMEndpoint#receiveAndReply
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: RequestExecutors =>
Option(allocator) match {
case Some(a) =>
// allocator为YarnAllocator,用于向resourcemanager请求资源,
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)
case None =>
logWarning("Container allocator is not ready to request executors yet.")
context.reply(false)
}
org.apache.spark.deploy.yarn.YarnAllocator#requestTotalExecutorsWithPreferredLocalities
def requestTotalExecutorsWithPreferredLocalities( requestedTotal: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int], nodeBlacklist: Set[String]): Boolean = synchronized { this.numLocalityAwareTasks = localityAwareTasks this.hostToLocalTaskCounts = hostToLocalTaskCount if (requestedTotal != targetNumExecutors) { logInfo(s"Driver requested a total number of $requestedTotal executor(s).") // 更新 要请求的executor数,这个非常关键,为什么这里没有同步请求resourmanager分配资源?且看后面解释 targetNumExecutors = requestedTotal allocatorBlacklistTracker.setSchedulerBlacklistedNodes(nodeBlacklist) true } else { false } }
在appmaster创建时,同时也创建1个YarnAllocator,用于向resourcemanager请求资源等操作。
调用链如下:
org.apache.spark.deploy.yarn.ApplicationMaster#runUnmanaged
org.apache.spark.deploy.yarn.ApplicationMaster#createAllocator
createAllocator 逻辑序列图如下:
主要点在于结尾给appmaster 创建并启动1个后台上报线程,用于间隔一定时间,向resourcemanager请求资源
launchReporterThread方法调用如下
org.apache.spark.deploy.yarn.ApplicationMaster#launchReporterThread
org.apache.spark.deploy.yarn.ApplicationMaster#allocationThreadImpl
org.apache.spark.deploy.yarn.YarnAllocator#allocateResources
org.apache.spark.deploy.yarn.YarnAllocator#updateResourceRequests
org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers
org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers
环境:yanr-client运行模式下,开启了动态资源分配
流程示意图如下:
部分日志摘要如下:
------driver端日志----------- 22/12/03 22:58:30 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1) ------appMaster端日志----------- 22/12/03 09:58:31 INFO YarnAllocator: Driver requested a total number of 1 executor(s). 22/12/03 09:58:31 INFO YarnAllocator: Will request 1 executor container(s), each with 1 core(s) and 896 MB memory (including 384 MB of overhead) 22/12/03 09:58:31 INFO YarnAllocator: Submitted container request for host hadoop3,hadoop2,hadoop1. 22/12/03 09:58:32 INFO AMRMClientImpl: Received new token for : hadoop2:33222 22/12/03 09:58:32 INFO YarnAllocator: Launching container container_1670078106874_0004_01_000002 on host hadoop2 for executor with ID 1 22/12/03 09:58:32 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 22/12/03 09:58:32 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0 22/12/03 09:58:32 INFO ContainerManagementProtocolProxy: Opening proxy : hadoop2:33222 ------resourceManager端日志----------- 2022-12-03 09:58:32,581 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator: assignedContainer application attempt=appattempt_1670078106874_0004_000001 container=null queue=org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator@24ca8dd clusterResource=<memory:24576, vCores:24> type=NODE_LOCAL requestedPartition= 2022-12-03 09:58:32,581 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue: assignedContainer queue=root usedCapacity=0.041666668 absoluteUsedCapacity=0.041666668 used=<memory:1024, vCores:1> cluster=<memory:24576, vCores:24> 2022-12-03 09:58:32,582 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1670078106874_0004_01_000002 Container Transitioned from NEW to ALLOCATED 2022-12-03 09:58:32,582 INFO org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=root OPERATION=AM Allocated Container TARGET=SchedulerApp RESULT=SUCCESS APPID=application_1670078106874_0004 CONTAINERID=container_1670078106874_0004_01_000002 RESOURCE=<memory:1024, vCores:1> 2022-12-03 09:58:32,582 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue: assignedContainer queue=root usedCapacity=0.083333336 absoluteUsedCapacity=0.083333336 used=<memory:2048, vCores:2> cluster=<memory:24576, vCores:24> 2022-12-03 09:58:32,582 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler: Allocation proposal accepted 2022-12-03 09:58:32,810 INFO org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM: Sending NMToken for nodeId : hadoop2:33222 for container : container_1670078106874_0004_01_000002 2022-12-03 09:58:32,811 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1670078106874_0004_01_000002 Container Transitioned from ALLOCATED to ACQUIRED 2022-12-03 09:58:33,583 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1670078106874_0004_01_000002 Container Transitioned from ACQUIRED to RUNNING ------driver端日志----------- 22/12/03 22:58:35 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (192.168.150.22:56294) with ID 1 22/12/03 22:58:35 INFO ExecutorMonitor: New executor 1 has registered (new total is 1) 22/12/03 22:58:36 INFO BlockManagerMasterEndpoint: Registering block manager hadoop2:43194 with 93.3 MiB RAM, BlockManagerId(1, hadoop2, 43194, None) 22/12/03 22:58:36 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop2, executor 1, partition 0, NODE_LOCAL, 7557 bytes)
https://blog.csdn.net/lovetechlovelife/article/details/112723766
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。