当前位置:   article > 正文

【Flink】Flink 资源相关 Slot SlotPool_offerslots







2.1. 介绍

SlotPool 是JobMaster用于管理slot的pool . 是一个接口类, 定义了相关slot的管理操作…


2.1.1. 生命周期相关接口


2.1.2 resource manager 连接相关

registerTaskManager通过给定的ResourceId 注册一个TaskExecutor

2.1.3 Slot操作相关

failAllocation根据给定的allocation id 标识slot为失败
getAvailableSlotsInformation获取当前可用的slots 信息.
allocateAvailableSlot在给定的 request id 下使用给定的 allocation id 分配可用的slot。
如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。
requestNewAllocatedSlot从resource manager 请求分配新slot。
requestNewAllocatedBatchSlot从 resource manager 请求分配新的批处理slot
createAllocatedSlotReport创建有关属于指定 task manager 的已分配slot的报告。

3.SlotPoolImpl 实现类

SlotPoolImpl 是SlotPool接口的实现类.

slot pool为{@link ExecutionGraph}发出的slot请求提供服务。



slot pool还保存提供给它并被接受的所有slot,因此即使ResourceManager关闭,也可以提供注册的空闲slot。




3.1. 属性

	/** The interval (in milliseconds) in which the SlotPool writes its slot distribution on debug level.
	 * SlotPool在调试级别上写其槽分布的时间间隔(毫秒)。
	 * */
	private static final long STATUS_LOG_INTERVAL_MS = 60_000;

	private final JobID jobId;

	/** All registered TaskManagers, slots will be accepted and used only if the resource is registered.
	 * 仅当资源已注册时,才会接受和使用所有已注册的TaskManager、slot。
	 * */
	private final HashSet<ResourceID> registeredTaskManagers;

	/** The book-keeping of all allocated slots.
	 * //所有分配给当前 JobManager 的 slots
	 * */
	private final AllocatedSlots allocatedSlots;

	/** The book-keeping of all available slots.
	 * 所有可用的 slots(已经分配给该 JobManager,但还没有装载 payload)
	 * */
	private final AvailableSlots availableSlots;

	/** All pending requests waiting for slots.
	 * 所有处于等待状态的slot request(已经发送请求给 ResourceManager) 等待slot的所有挂起请求。
	 * */
	private final DualKeyLinkedMap<SlotRequestId, AllocationID, PendingRequest> pendingRequests;

	/** The requests that are waiting for the resource manager to be connected.
	 * 处于等待状态的 slot request (还没有发送请求给 ResourceManager,此时没有和 ResourceManager 建立连接)
	 * 等待连接 resource manager 的请求。
	 * */
	private final LinkedHashMap<SlotRequestId, PendingRequest> waitingForResourceManager;

	/** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor).
	 * 外部请求调用超时(例如,到ResourceManager或TaskExecutor)。
	 * */
	private final Time rpcTimeout;

	/** Timeout for releasing idle slots.
	 * 释放空闲的slots超时时间
	 * */
	private final Time idleSlotTimeout;

	/** Timeout for batch slot requests.
	 * 批处理slot请求超时
	 * */
	private final Time batchSlotTimeout;

	private final Clock clock;

	/** the fencing token of the job manager. */
	private JobMasterId jobMasterId;

	/** The gateway to communicate with resource manager. */
	private ResourceManagerGateway resourceManagerGateway;

	private String jobManagerAddress;

	// 组件主线程执行器
	private ComponentMainThreadExecutor componentMainThreadExecutor;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

3.2 生命周期相关接口


3.2.1 start方法

	 * Start the slot pool to accept RPC calls.
	 * 启动slot池以接受RPC调用。
	 * @param jobMasterId The necessary leader id for running the job.
	 * @param newJobManagerAddress for the slot requests which are sent to the resource manager
	 * @param componentMainThreadExecutor The main thread executor for the job master's main thread.
	public void start(
		@Nonnull JobMasterId jobMasterId,
		@Nonnull String newJobManagerAddress,
		@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {

		this.jobMasterId = jobMasterId;
		this.jobManagerAddress = newJobManagerAddress;
		this.componentMainThreadExecutor = componentMainThreadExecutor;

		// 超时相关操作
		scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
		scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);

		if (log.isDebugEnabled()) {
			scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

3.2.2 suspend

	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
	 * 挂起此池,意味着它已失去接受和分发slot的权限。
	public void suspend() {


		log.info("Suspending SlotPool.");

		// cancel all pending allocations --> we can request these slots
		// again after we regained the leadership
		Set<AllocationID> allocationIds = pendingRequests.keySetB();

		for (AllocationID allocationId : allocationIds) {
			// resourceManagerGateway 取消 SlotRequest操作

		// do not accept any requests
		jobMasterId = null;
		resourceManagerGateway = null;

		// Clear (but not release!) the available slots. The TaskManagers should re-register them
		// at the new leader JobManager/SlotPool
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

3.2.3 close

	public void close() {
		log.info("Stopping SlotPool.");
		// cancel all pending allocations
		// 取消挂起的SlotRequests
		Set<AllocationID> allocationIds = pendingRequests.keySetB();

		for (AllocationID allocationId : allocationIds) {

		// 释放资源 通过释放相应的TaskExecutor来释放所有注册的插槽
		// release all registered slots by releasing the corresponding TaskExecutors
		for (ResourceID taskManagerResourceId : registeredTaskManagers) {
			final FlinkException cause = new FlinkException(
				"Releasing TaskManager " + taskManagerResourceId + ", because of stopping of SlotPool");
			releaseTaskManagerInternal(taskManagerResourceId, cause);


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.3 resource manager 连接相关

registerTaskManager通过给定的ResourceId 注册一个TaskExecutor

3.3.1 connectToResourceManager

	 * 与ResourceManager建立连接, 处理阻塞/挂起的请求…
	 * @param resourceManagerGateway  The RPC gateway for the resource manager.
	public void connectToResourceManager(@Nonnull ResourceManagerGateway resourceManagerGateway) {
		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);

		// 处理挂起的PendingRequest 请求.
		// work on all slots waiting for this connection
		for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
			// 请求 RM / 获取资源
			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);

		// all sent off
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3.3.2 disconnectResourceManager

关闭ResourceManager 连接.

	public void disconnectResourceManager() {
		this.resourceManagerGateway = null;

  • 1
  • 2
  • 3
  • 4
  • 5

3.3.3 registerTaskManager

	 * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid.
	 * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool.
	 * @param resourceID The id of the TaskManager
	 *                      将TaskManager注册到此 pool ,只有来自已注册TaskManager的slot才被视为有效。
	 * 它还为我们提供了一种方法,使“dead”或“abnormal”任务管理者远离这个池
	public boolean registerTaskManager(final ResourceID resourceID) {


		log.debug("Register new TaskExecutor {}.", resourceID);
		return registeredTaskManagers.add(resourceID);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3.3.4 releaseTaskManager

	 * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called
	 * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore.

	 * 从该池中注销TaskManager,将释放所有相关slot并取消任务。
	 * 当我们发现某个TaskManager变得“dead”或“abnormal”,并且我们决定不再使用其中的slot时调用。
	 * @param resourceId The id of the TaskManager
	 * @param cause for the releasing of the TaskManager
	public boolean releaseTaskManager(final ResourceID resourceId, final Exception cause) {


		if (registeredTaskManagers.remove(resourceId)) {
			releaseTaskManagerInternal(resourceId, cause);
			return true;
		} else {
			return false;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.4 Slot操作相关

failAllocation根据给定的allocation id 标识slot为失败
getAvailableSlotsInformation获取当前可用的slots 信息.
allocateAvailableSlot在给定的 request id 下使用给定的 allocation id 分配可用的slot。
如果没有具有给定分配id的插槽可用,则此方法返回{@code null}。
requestNewAllocatedSlot从resource manager 请求分配新slot。
requestNewAllocatedBatchSlot从 resource manager 请求分配新的批处理slot
createAllocatedSlotReport创建有关属于指定 task manager 的已分配slot的报告。

3.4.1 offerSlots

	 * 根据AllocationID , TaskExecutor 提供Slot
	 * AllocationID最初由该 pool 生成,并通过ResourceManager传输到TaskManager
	 * 我们用它来区分我们发行的不同分配。
	 * 如果我们发现某个Slot不匹配或实际上没有等待此Slot的挂起请求(可能由其他返回的Slot完成),则Slot提供可能会被拒绝。
	 * Slot offering by TaskExecutor with AllocationID. The AllocationID is originally generated by this pool and
	 * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
	 * request waiting for this slot (maybe fulfilled by some other returned slot).
	 * @param taskManagerLocation location from where the offer comes from
	 * @param taskManagerGateway TaskManager gateway
	 * @param slotOffer the offered slot
	 * @return True if we accept the offering
	boolean offerSlot(
			final TaskManagerLocation taskManagerLocation,
			final TaskManagerGateway taskManagerGateway,
			final SlotOffer slotOffer) {


		// 检测 TaskManager是否有效
		// check if this TaskManager is valid
		final ResourceID resourceID = taskManagerLocation.getResourceID();
		final AllocationID allocationID = slotOffer.getAllocationId();

		// 必须是已注册的TaskManagers 中的slotOffer
		if (!registeredTaskManagers.contains(resourceID)) {
			log.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
					slotOffer.getAllocationId(), taskManagerLocation);
			return false;

		// 如果当前 slot 关联的 AllocationID 已经在 SlotPool 中出现   检查是否已使用此slot
		// check whether we have already using this slot
		AllocatedSlot existingSlot;
		if ((existingSlot = allocatedSlots.get(allocationID)) != null ||
			(existingSlot = availableSlots.get(allocationID)) != null) {

			// we need to figure out if this is a repeated offer for the exact same slot,
			// or another offer that comes from a different TaskManager after the ResourceManager
			// re-tried the request

			//  我们需要弄清楚这是对完全相同的slot的重复offer,
			//  还是在ResourceManager重新尝试请求后来自不同TaskManager的另一个offer

			// 我们用比较SlotID的方式来写这个,因为SlotIDD是 TaskManager上实际slot的标识符

			// we write this in terms of comparing slot IDs, because the Slot IDs are the identifiers of
			// the actual slots on the TaskManagers
			// Note: The slotOffer should have the SlotID

			// 获取已存在的SlotID
			final SlotID existingSlotId = existingSlot.getSlotId();
			// 获取新的SlotID
			final SlotID newSlotId = new SlotID(taskManagerLocation.getResourceID(), slotOffer.getSlotIndex());

			//这个 slot 在之前已经被 SlotPool 接受了,相当于 TaskExecutor 发送了一个重复的 offer
			if (existingSlotId.equals(newSlotId)) {
				log.info("Received repeated offer for slot [{}]. Ignoring.", allocationID);

				// return true here so that the sender will get a positive acknowledgement to the retry
				// and mark the offering as a success
				return true;
			} else {
				//已经有一个其他的 AllocatedSlot 和 这个 AllocationID 关联了,因此不能接受当前的这个 slot
				// the allocation has been fulfilled by another slot, reject the offer so the task executor
				// will offer the slot to the resource manager
				return false;

		// 到这里代表这个slot还没有人用过.
		//这个 slot 关联的 AllocationID 此前没有出现过

		//新建一个 AllocatedSlot 对象,表示新分配的 slot
		final AllocatedSlot allocatedSlot = new AllocatedSlot(

		// 检查是否有一个 request 和 这个 AllocationID 关联
		// check whether we have request waiting for this slot
		PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
		if (pendingRequest != null) {
			// we were waiting for this!
			//有一个pending request 正在等待这个 slot
			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);

			if (!pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot)) {
				// we could not complete the pending slot future --> try to fulfill another pending request
				//尝试去满足其他在等待的请求,使用 slot 以请求的顺序完成挂起的请求
			} else {
				log.debug("Fulfilled slot request [{}] with allocated slot [{}].", pendingRequest.getSlotRequestId(), allocationID);
		else {
			// we were actually not waiting for this:
			//   - could be that this request had been fulfilled
			//   - we are receiving the slots from TaskManagers after becoming leaders

		// we accepted the request in any case. slot will be released after it idled for
		// too long and timed out
		// 无论如何我么都接受了这个请求.
		// slot在空闲时间过长和超时后将被释放
		return true;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122


	 * Tries to fulfill with the given allocated slot a pending slot request or add the
	 * allocated slot to the set of available slots if no matching request is available.
	 * 尝试使用给定的已分配slot完成挂起的slot请求,
	 * 或者如果没有匹配的请求,则将已分配的slot归还到可用slot集。
	 * @param allocatedSlot which shall be returned
	private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
		Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");

		//查找和当前 AllocatedSlot 的计算资源相匹配的还在等待的请求
		final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);

		if (pendingRequest != null) {
			//如果有匹配的请求,那么将 AllocatedSlot 分配给等待的请求
			log.debug("Fulfilling pending slot request [{}] early with returned slot [{}]",
				pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());

			// 将当前分配的slot加入到已分配的allocatedSlots集合中, 标识已被使用.
			allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);

			// 回调请求,返回allocatedSlot 信息.  标识slot分配已经完成...
		} else {
			//如果没有,那么这个 AllocatedSlot 变成 available 的
			// 没有可用的PendingRequest , 归还allocatedSlot .
			log.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
			availableSlots.add(allocatedSlot, clock.relativeTimeMillis());

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

3.4.2 failAllocation

	public Optional<ResourceID> failAllocation(final AllocationID allocationID, final Exception cause) {


		// 获取PendingRequest
		final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
		if (pendingRequest != null) {
			if (isBatchRequestAndFailureCanBeIgnored(pendingRequest, cause)) {
				// pending batch requests don't react to this signal --> put it back
				pendingRequests.put(pendingRequest.getSlotRequestId(), allocationID, pendingRequest);
			} else {
				// request was still pending
				failPendingRequest(pendingRequest, cause);
			return Optional.empty();
		else {
			return tryFailingAllocatedSlot(allocationID, cause);

		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23


	private Optional<ResourceID> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
		// 获取分配失败的AllocatedSlot
		AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);

		if (allocatedSlot == null) {
			allocatedSlot = allocatedSlots.remove(allocationID);

		if (allocatedSlot != null) {
			log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());

			// notify TaskExecutor about the failure
			// 通知TaskExecutor 分配失败了..
			allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
			// release the slot.
			// since it is not in 'allocatedSlots' any more, it will be dropped o return'
			// 释放slot,并且将这个slot丢弃

			final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();

			if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {
				return Optional.of(taskManagerId);

		return Optional.empty();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

3.4.3 getAvailableSlotsInformation


	 * 列出当前可用的 slot
	 * @return
	public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
		final Map<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager = availableSlots.getSlotsByTaskManager();
		final Map<ResourceID, Set<AllocatedSlot>> allocatedSlotsSlotsByTaskManager = allocatedSlots.getSlotsByTaskManager();

		return availableSlotsByTaskManager.entrySet().stream()
			.flatMap(entry -> {
				final int numberAllocatedSlots = allocatedSlotsSlotsByTaskManager.getOrDefault(entry.getKey(), Collections.emptySet()).size();
				final int numberAvailableSlots = entry.getValue().size();
				final double taskExecutorUtilization = (double) numberAllocatedSlots / (numberAllocatedSlots + numberAvailableSlots);

				return entry.getValue().stream().map(slot -> SlotInfoWithUtilization.from(slot, taskExecutorUtilization));
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

3.4.4 getAllocatedSlotsInformation


	private Collection<SlotInfo> getAllocatedSlotsInformation() {
		return allocatedSlots.listSlotInfo();

  • 1
  • 2
  • 3
  • 4

3.4.5 allocateAvailableSlot


	 * 将 allocationID 关联的 slot 分配给 slotRequestId 对应的请求
	 * @param slotRequestId identifying the requested slot
	 * @param allocationID the allocation id of the requested available slot
	 * @return
	public Optional<PhysicalSlot> allocateAvailableSlot(
		@Nonnull SlotRequestId slotRequestId,
		@Nonnull AllocationID allocationID) {

		//从 availableSlots 中移除
		AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);
		if (allocatedSlot != null) {
			allocatedSlots.add(slotRequestId, allocatedSlot);
			return Optional.of(allocatedSlot);
		} else {
			return Optional.empty();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.4.6 requestNewAllocatedSlot

从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,而是将向该池添加一个新slot,该slot将立即分配并返回。

	 * 向RM申请新的 slot
	 * 从resource manager 请求分配新slot。 此方法不会从池中已经可用的slot返回slot,
	 * 而是将向该池添加一个新slot,该slot将立即分配并返回。
	 * @param slotRequestId identifying the requested slot
	 * @param resourceProfile resource profile that specifies the resource requirements for the requested slot
	 * @param timeout timeout for the allocation procedure
	 * @return
	public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
			@Nonnull SlotRequestId slotRequestId,
			@Nonnull ResourceProfile resourceProfile,
			Time timeout) {


		final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);

		// register request timeout
				(AllocatedSlot ignored, Throwable throwable) -> {
					if (throwable instanceof TimeoutException) {

		return requestNewAllocatedSlotInternal(pendingRequest)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40


	 * 从RM中请求一个新的slot
	 * Requests a new slot from the ResourceManager. If there is currently not ResourceManager
	 * connected, then the request is stashed and send once a new ResourceManager is connected.
	 * @param pendingRequest pending slot request
	 * @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool}
	private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {

		if (resourceManagerGateway == null) {
		} else {
			// 从RM中请求一个新的slot
			requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);

		return pendingRequest.getAllocatedSlotFuture();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.4.7 requestNewAllocatedBatchSlot

从 resource manager 请求分配新的批处理slot 与普通slot不同,批处理slot只有在slot池不包含合适的slot时才会超时。 此外,它不会对来自资源管理器的故障信号做出反应。

	public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(
		@Nonnull SlotRequestId slotRequestId,
		@Nonnull ResourceProfile resourceProfile) {


		final PendingRequest pendingRequest = PendingRequest.createBatchRequest(slotRequestId, resourceProfile);

		return requestNewAllocatedSlotInternal(pendingRequest)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

3.4.8 disableBatchSlotRequestTimeoutCheck


    public void disableBatchSlotRequestTimeoutCheck() {
        batchSlotRequestTimeoutCheckEnabled = false;

  • 1
  • 2
  • 3
  • 4
  • 5

3.4.9 createAllocatedSlotReport

创建有关属于指定 task manager 的已分配slot的报告。

	 * 创建有关属于指定 task manager 的已分配slot的报告。
	 * @param taskManagerId identifies the task manager
	 * @return
	public AllocatedSlotReport createAllocatedSlotReport(ResourceID taskManagerId) {
		final Set<AllocatedSlot> availableSlotsForTaskManager = availableSlots.getSlotsForTaskManager(taskManagerId);
		final Set<AllocatedSlot> allocatedSlotsForTaskManager = allocatedSlots.getSlotsForTaskManager(taskManagerId);

		List<AllocatedSlotInfo> allocatedSlotInfos = new ArrayList<>(
				availableSlotsForTaskManager.size() + allocatedSlotsForTaskManager.size());
		for (AllocatedSlot allocatedSlot : Iterables.concat(availableSlotsForTaskManager, allocatedSlotsForTaskManager)) {
					new AllocatedSlotInfo(allocatedSlot.getPhysicalSlotNumber(), allocatedSlot.getAllocationId()));
		return new AllocatedSlotReport(jobId, allocatedSlotInfos);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
