赞
踩
转载并且补充:http://www.qishunwang.net/news_show_82511.aspx
SlotPool 是JobMaster用于管理slot的pool . 是一个接口类, 定义了相关slot的管理操作…
主要有如下方法
接口 | 含义 |
---|---|
start | 启动 |
suspend | 挂起 |
close | 关闭 |
接口 | 含义 |
---|---|
connectToResourceManager | 与ResourceManager建立连接 |
disconnectResourceManager | 关闭ResourceManager连接 |
registerTaskManager | 通过给定的ResourceId 注册一个TaskExecutor |
releaseTaskManager | 释放TaskExecutor |
接口 | 含义 |
---|---|
offerSlots | 释放slot |
failAllocation | 根据给定的allocation id 标识slot为失败 |
getAvailableSlotsInformation | 获取当前可用的slots 信息. |
getAllocatedSlotsInformation | 获取所有的slot信息 |
allocateAvailableSlot | 在给定的 request id 下使用给定的 allocation id 分配可用的slot。 如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。 |
requestNewAllocatedSlot | 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。 |
requestNewAllocatedBatchSlot | 从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。 |
disableBatchSlotRequestTimeoutCheck | 禁用批处理slot请求超时检查。 当其他人要接管超时检查职责时调用。 |
createAllocatedSlotReport | 创建有关属于指定 task manager 的已分配slot的报告。 |
SlotPoolImpl 是SlotPool接口的实现类.
slot pool为{@link ExecutionGraph}发出的slot请求提供服务。
当它无法提供slot请求时,它将尝试从ResourceManager获取新的slot。
如果当前没有可用的ResourceManager,或者ResourceManager拒绝了它,或者请求超时,那么它将使slot请求失败。
slot pool还保存提供给它并被接受的所有slot,因此即使ResourceManager关闭,也可以提供注册的空闲slot。
slot只有在无用时才会释放,例如,当作业完全运行时,但我们仍有一些可用slot。
所有的分配或槽提供都将由自己生成的AllocationID标识,我们将使用它来消除歧义。
/** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level. * SlotPool在调试级别上写其槽分布的时间间隔(毫秒)。 * */ private static final long STATUS_LOG_INTERVAL_MS = 60_000; private final JobID jobId; /** All registered TaskManagers, slots will be accepted and used only if the resource is registered. * 仅当资源已注册时,才会接受和使用所有已注册的TaskManager、slot。 * */ private final HashSet<ResourceID> registeredTaskManagers; /** The book-keeping of all allocated slots. * //所有分配给当前 JobManager 的 slots * */ private final AllocatedSlots allocatedSlots; /** The book-keeping of all available slots. * 所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload) * */ private final AvailableSlots availableSlots; /** All pending requests waiting for slots. * 所有处于等待状态的slot request(已经发送请求给 ResourceManager) 等待slot的所有挂起请求。 * */ private final DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests; /** The requests that are waiting for the resource manager to be connected. * 处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接) * 等待连接 resource manager 的请求。 * */ private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager; /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). * 外部请求调用超时(例如,到ResourceManager或TaskExecutor)。 * */ private final Time rpcTimeout; /** Timeout for releasing idle slots. * 释放空闲的slots超时时间 * */ private final Time idleSlotTimeout; /** Timeout for batch slot requests. * 批处理slot请求超时 * */ private final Time batchSlotTimeout; private final Clock clock; /** the fencing token of the job manager. */ private JobMasterId jobMasterId; /** The gateway to communicate with resource manager. */ private ResourceManagerGateway resourceManagerGateway; private String jobManagerAddress; // 组件主线程执行器 private ComponentMainThreadExecutor componentMainThreadExecutor;
接口 | 含义 |
---|---|
start | 启动 |
suspend | 挂起 |
close | 关闭 |
/** * Start the slot pool to accept RPC calls. * * 启动slot池以接受RPC调用。 * * @param jobMasterId The necessary leader id for running the job. * @param newJobManagerAddress for the slot requests which are sent to the resource manager * @param componentMainThreadExecutor The main thread executor for the job master's main thread. */ public void start( @Nonnull JobMasterId jobMasterId, @Nonnull String newJobManagerAddress, @Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception { this.jobMasterId = jobMasterId; this.jobManagerAddress = newJobManagerAddress; this.componentMainThreadExecutor = componentMainThreadExecutor; // 超时相关操作 scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout); scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout); if (log.isDebugEnabled()) { scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS); } }
/** * Suspends this pool, meaning it has lost its authority to accept and distribute slots. * * 挂起此池,意味着它已失去接受和分发slot的权限。 */ @Override public void suspend() { componentMainThreadExecutor.assertRunningInMainThread(); log.info("Suspending SlotPool."); // cancel all pending allocations --> we can request these slots // again after we regained the leadership Set<AllocationID> allocationIds = pendingRequests.keySetB(); for (AllocationID allocationId : allocationIds) { // resourceManagerGateway 取消 SlotRequest操作 resourceManagerGateway.cancelSlotRequest(allocationId); } // do not accept any requests jobMasterId = null; resourceManagerGateway = null; // Clear (but not release!) the available slots. The TaskManagers should re-register them // at the new leader JobManager/SlotPool clear(); }
@Override public void close() { log.info("Stopping SlotPool."); // cancel all pending allocations // 取消挂起的SlotRequests Set<AllocationID> allocationIds = pendingRequests.keySetB(); for (AllocationID allocationId : allocationIds) { resourceManagerGateway.cancelSlotRequest(allocationId); } // 释放资源 通过释放相应的TaskExecutor来释放所有注册的插槽 // release all registered slots by releasing the corresponding TaskExecutors for (ResourceID taskManagerResourceId : registeredTaskManagers) { final FlinkException cause = new FlinkException( "Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool"); releaseTaskManagerInternal(taskManagerResourceId, cause); } clear(); }
接口 | 含义 |
---|---|
connectToResourceManager | 与ResourceManager建立连接 |
disconnectResourceManager | 关闭ResourceManager连接 |
registerTaskManager | 通过给定的ResourceId 注册一个TaskExecutor |
releaseTaskManager | 释放TaskExecutor |
/** * 与ResourceManager建立连接, 处理阻塞/挂起的请求… * @param resourceManagerGateway The RPC gateway for the resource manager. */ @Override public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) { this.resourceManagerGateway = checkNotNull(resourceManagerGateway); // 处理挂起的PendingRequest 请求. // work on all slots waiting for this connection for (PendingRequest pendingRequest : waitingForResourceManager.values()) { // 请求 RM / 获取资源 requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); } // all sent off waitingForResourceManager.clear(); }
关闭ResourceManager 连接.
@Override
public void disconnectResourceManager() {
this.resourceManagerGateway = null;
}
/** * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid. * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool. * * @param resourceID The id of the TaskManager * * * 将TaskManager注册到此 pool ,只有来自已注册TaskManager的slot才被视为有效。 * 它还为我们提供了一种方法,使“dead”或“abnormal”任务管理者远离这个池 */ @Override public boolean registerTaskManager(final ResourceID resourceID) { componentMainThreadExecutor.assertRunningInMainThread(); log.debug("Register new TaskExecutor {}.", resourceID); return registeredTaskManagers.add(resourceID); }
/** * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore. * 从该池中注销TaskManager,将释放所有相关slot并取消任务。 * 当我们发现某个TaskManager变得“dead”或“abnormal”,并且我们决定不再使用其中的slot时调用。 * * @param resourceId The id of the TaskManager * @param cause for the releasing of the TaskManager */ @Override public boolean releaseTaskManager(final ResourceID resourceId, final Exception cause) { componentMainThreadExecutor.assertRunningInMainThread(); if (registeredTaskManagers.remove(resourceId)) { releaseTaskManagerInternal(resourceId, cause); return true; } else { return false; } }
接口 | 含义 |
---|---|
offerSlots | 消费slot |
failAllocation | 根据给定的allocation id 标识slot为失败 |
getAvailableSlotsInformation | 获取当前可用的slots 信息. |
getAllocatedSlotsInformation | 获取所有的slot信息 |
allocateAvailableSlot | 在给定的 request id 下使用给定的 allocation id 分配可用的slot。 如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。 |
requestNewAllocatedSlot | 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。 |
requestNewAllocatedBatchSlot | 从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。 |
disableBatchSlotRequestTimeoutCheck | 禁用批处理slot请求超时检查。 当其他人要接管超时检查职责时调用。 |
createAllocatedSlotReport | 创建有关属于指定 task manager 的已分配slot的报告。 |
/** * 根据AllocationID , TaskExecutor 提供Slot * * AllocationID最初由该 pool 生成,并通过ResourceManager传输到TaskManager * * 我们用它来区分我们发行的不同分配。 * * 如果我们发现某个Slot不匹配或实际上没有等待此Slot的挂起请求(可能由其他返回的Slot完成),则Slot提供可能会被拒绝。 * * Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending * request waiting for this slot (maybe fulfilled by some other returned slot). * * @param taskManagerLocation location from where the offer comes from * @param taskManagerGateway TaskManager gateway * @param slotOffer the offered slot * @return True if we accept the offering */ boolean offerSlot( final TaskManagerLocation taskManagerLocation, final TaskManagerGateway taskManagerGateway, final SlotOffer slotOffer) { componentMainThreadExecutor.assertRunningInMainThread(); // 检测 TaskManager是否有效 // check if this TaskManager is valid final ResourceID resourceID = taskManagerLocation.getResourceID(); final AllocationID allocationID = slotOffer.getAllocationId(); // 必须是已注册的TaskManagers 中的slotOffer if (!registeredTaskManagers.contains(resourceID)) { log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}", slotOffer.getAllocationId(), taskManagerLocation); return false; } // 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现 检查是否已使用此slot // check whether we have already using this slot AllocatedSlot existingSlot; if ((existingSlot = allocatedSlots.get(allocationID)) != null || (existingSlot = availableSlots.get(allocationID)) != null) { // we need to figure out if this is a repeated offer for the exact same slot, // or another offer that comes from a different TaskManager after the ResourceManager // re-tried the request // 我们需要弄清楚这是对完全相同的slot的重复offer, // 还是在ResourceManager重新尝试请求后来自不同TaskManager的另一个offer // 我们用比较SlotID的方式来写这个,因为SlotIDD是 TaskManager上实际slot的标识符 // we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of // the actual slots on the TaskManagers // Note: The slotOffer should have the SlotID // 获取已存在的SlotID final SlotID existingSlotId = existingSlot.getSlotId(); // 获取新的SlotID final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex()); //这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offer if (existingSlotId.equals(newSlotId)) { log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID); // return true here so that the sender will get a positive acknowledgement to the retry // and mark the offering as a success return true; } else { //已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot // the allocation has been fulfilled by another slot, reject the offer so the task executor // will offer the slot to the resource manager return false; } } // 到这里代表这个slot还没有人用过. //这个 slot 关联的 AllocationID 此前没有出现过 //新建一个 AllocatedSlot 对象,表示新分配的 slot final AllocatedSlot allocatedSlot = new AllocatedSlot( allocationID, taskManagerLocation, slotOffer.getSlotIndex(), slotOffer.getResourceProfile(), taskManagerGateway); // 检查是否有一个 request 和 这个 AllocationID 关联 // check whether we have request waiting for this slot PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { // we were waiting for this! //有一个pending request 正在等待这个 slot allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); //尝试去完成那个等待的请求 if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) { // we could not complete the pending slot future --> try to fulfill another pending request //失败了 allocatedSlots.remove(pendingRequest.getSlotRequestId()); //尝试去满足其他在等待的请求,使用 slot 以请求的顺序完成挂起的请求 tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } else { log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID); } } else { //没有请求在等待这个slot,可能请求已经被满足了 // we were actually not waiting for this: // - could be that this request had been fulfilled // - we are receiving the slots from TaskManagers after becoming leaders //尝试去满足其他在等待的请求 tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } // we accepted the request in any case. slot will be released after it idled for // too long and timed out // 无论如何我么都接受了这个请求. // slot在空闲时间过长和超时后将被释放 return true; }
tryFulfillSlotRequestOrMakeAvailable
/** * Tries to fulfill with the given allocated slot a pending slot request or add the * allocated slot to the set of available slots if no matching request is available. * * 尝试使用给定的已分配slot完成挂起的slot请求, * 或者如果没有匹配的请求,则将已分配的slot归还到可用slot集。 * * @param allocatedSlot which shall be returned */ private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) { Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use."); //查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求 final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); if (pendingRequest != null) { //如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求 log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]", pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); // 将当前分配的slot加入到已分配的allocatedSlots集合中, 标识已被使用. allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); // 回调请求,返回allocatedSlot 信息. 标识slot分配已经完成... pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); } else { //如果没有,那么这个 AllocatedSlot 变成 available 的 // 没有可用的PendingRequest , 归还allocatedSlot . log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } }
@Override public Optional<ResourceID> failAllocation(final AllocationID allocationID, final Exception cause) { componentMainThreadExecutor.assertRunningInMainThread(); // 获取PendingRequest final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID); if (pendingRequest != null) { if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) { // pending batch requests don't react to this signal --> put it back pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest); } else { // request was still pending failPendingRequest(pendingRequest, cause); } return Optional.empty(); } else { return tryFailingAllocatedSlot(allocationID, cause); } // TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase }
tryFailingAllocatedSlot
private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) { // 获取分配失败的AllocatedSlot AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); if (allocatedSlot == null) { allocatedSlot = allocatedSlots.remove(allocationID); } if (allocatedSlot != null) { log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage()); // notify TaskExecutor about the failure // 通知TaskExecutor 分配失败了.. allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout); // release the slot. // since it is not in 'allocatedSlots' any more, it will be dropped o return' // 释放slot,并且将这个slot丢弃 allocatedSlot.releasePayload(cause); final ResourceID taskManagerId = allocatedSlot.getTaskManagerId(); if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) { return Optional.of(taskManagerId); } } return Optional.empty(); }
获取可用的slot信息
/** * 列出当前可用的 slot * @return */ @Override @Nonnull public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() { final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager(); final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsSlotsByTaskManager = allocatedSlots.getSlotsByTaskManager(); return availableSlotsByTaskManager.entrySet().stream() .flatMap(entry -> { final int numberAllocatedSlots = allocatedSlotsSlotsByTaskManager.getOrDefault(entry.getKey(), Collections.emptySet()).size(); final int numberAvailableSlots = entry.getValue().size(); final double taskExecutorUtilization = (double) numberAllocatedSlots / (numberAllocatedSlots + numberAvailableSlots); return entry.getValue().stream().map(slot -> SlotInfoWithUtilization.from(slot, taskExecutorUtilization)); }) .collect(Collectors.toList()); }
获取所有已分配的solt信息
private Collection<SlotInfo> getAllocatedSlotsInformation() {
return allocatedSlots.listSlotInfo();
}
获取所有已有效的solt信息
/** * 将 allocationID 关联的 slot 分配给 slotRequestId 对应的请求 * @param slotRequestId identifying the requested slot * @param allocationID the allocation id of the requested available slot * @return */ @Override public Optional<PhysicalSlot> allocateAvailableSlot( @Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID) { componentMainThreadExecutor.assertRunningInMainThread(); //从 availableSlots 中移除 AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID); if (allocatedSlot != null) { //加入已分配的映射关系中 allocatedSlots.add(slotRequestId, allocatedSlot); return Optional.of(allocatedSlot); } else { return Optional.empty(); } }
从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。
/** * 向RM申请新的 slot * * 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot, * 而是将向该池添加一个新slot,该slot将立即分配并返回。 * * @param slotRequestId identifying the requested slot * @param resourceProfile resource profile that specifies the resource requirements for the requested slot * @param timeout timeout for the allocation procedure * @return */ @Nonnull @Override public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot( @Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, Time timeout) { componentMainThreadExecutor.assertRunningInMainThread(); final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile); // register request timeout FutureUtils .orTimeout( pendingRequest.getAllocatedSlotFuture(), timeout.toMilliseconds(), TimeUnit.MILLISECONDS, componentMainThreadExecutor) .whenComplete( (AllocatedSlot ignored, Throwable throwable) -> { if (throwable instanceof TimeoutException) { timeoutPendingSlotRequest(slotRequestId); } }); return requestNewAllocatedSlotInternal(pendingRequest) .thenApply((Function.identity())); }
requestNewAllocatedSlotInternal
/** * * 从RM中请求一个新的slot * * Requests a new slot from the ResourceManager. If there is currently not ResourceManager * connected, then the request is stashed and send once a new ResourceManager is connected. * * @param pendingRequest pending slot request * @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool} */ @Nonnull private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) { if (resourceManagerGateway == null) { stashRequestWaitingForResourceManager(pendingRequest); } else { // 从RM中请求一个新的slot requestSlotFromResourceManager(resourceManagerGateway, pendingRequest); } return pendingRequest.getAllocatedSlotFuture(); }
从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。
@Nonnull
@Override
public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile) {
componentMainThreadExecutor.assertRunningInMainThread();
final PendingRequest pendingRequest = PendingRequest.createBatchRequest(slotRequestId, resourceProfile);
return requestNewAllocatedSlotInternal(pendingRequest)
.thenApply(Function.identity());
}
禁用批处理slot请求超时检查。当其他人要接管超时检查职责时调用。
@Override
public void disableBatchSlotRequestTimeoutCheck() {
batchSlotRequestTimeoutCheckEnabled = false;
}
创建有关属于指定 task manager 的已分配slot的报告。
/** * 创建有关属于指定 task manager 的已分配slot的报告。 * @param taskManagerId identifies the task manager * @return */ @Override public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) { final Set<AllocatedSlot> availableSlotsForTaskManager = availableSlots.getSlotsForTaskManager(taskManagerId); final Set<AllocatedSlot> allocatedSlotsForTaskManager = allocatedSlots.getSlotsForTaskManager(taskManagerId); List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>( availableSlotsForTaskManager.size() + allocatedSlotsForTaskManager.size()); for (AllocatedSlot allocatedSlot : Iterables.concat(availableSlotsForTaskManager, allocatedSlotsForTaskManager)) { allocatedSlotInfos.add( new AllocatedSlotInfo(allocatedSlot.getPhysicalSlotNumber(), allocatedSlot.getAllocationId())); } return new AllocatedSlotReport(jobId, allocatedSlotInfos); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。