赞
踩
本文基于flink-1.17版本,对于flink源码学习了解,仅作为个人学习笔记,如有错误,欢迎指正。
看以下流程时,请及时参考本图
提交命令
通过flink on yarn per-job模式提交,查看flink脚本可以看到,程序被提交后,会寻找CliFrontend类
CliFrotend n main方法入口
其中:parseAndRun的run中:
获取有效配置中:
执行程序中:
执行用户代码:
env.execute() ->StreamExecutionEnvironment.execute()
获取StreamGraph StreamExecutionEnvironment类
获取PipelineExecutor执行器 执行
生成jobGraph YarnJobClusterExecutor类
其中1:
创建yarnClient 生成YarnClusterDescriptor YarnClusterClientFactory类
其中2:
启动am ApplicationMaster
YarnJobClusterEntrypoint 启动入口 main方法
工厂类构建和对象创建
对象创建:
创建resourceManager
创建并启动dispatcher 启动resourceManager
其中:创建并启动dispatcher
DefaultDispatcherRunnerFactory -> createDispatcherRunner
DefaultDispatcherRunner -> create
DefaultDispatcherRunner -> start
StandaloneLeaderElectionService -> start
DefaultDispatcherRunner -> grantLeadership
DefaultDispatcherRunner -> startNewDispatcherLeaderProcess
AbstractDispatcherLeaderProcess -> start
AbstractDispatcherLeaderProcess -> startInternal
JobDispatcherLeaderProcess -> onStart
DefaultDispatcherGatewayServiceFactory -> create
DefaultDispatcherGatewayServiceFactory
Dispatcher -> onStart
其中:dispatcher服务启动主要进行注册
其中:启动jobMaster
其中: 启动resourceManager
ResourceManagerServiceImpl -> grantLeadership
ResourceManagerServiceImpl -> startNewLeaderResourceManager
ResourceManagerServiceImpl -> startResourceManagerIfIsLeader
resourceManager -> start
ResourceManager -> onStart
ResourceManager -> startResourceManagerServices
initializate():
创建yarn的reasouceManager和nodeManager的client 并启动
其中:启动jobMaster
JobMaster -> onStart
真正启动jobMaster
建立连接,开始请求资源
StandaloneLeaderRetrievalService -> start
JobMaster -> notifyOfNewResourceManagerLeader
JobMaster -> reconnectToResourceManager
JobMaster -> tryConnectToResourceManager
JobMaster -> connectToResourceManager
RegisteredRpcConnection -> start
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
JobMaster -> onRegistrationSuccess
JobMaster -> establishResourceManagerConnection
DeclarativeSlotPoolService -> connectToResourceManager
ResourceManager -> declareRequiredResources
FineGrainedSlotManager -> processResourceRequirements
其中:notifyResourceRequirements
其中:checkResourceRequirementsWithDelay
其中 tryFulfillRequirements
tryFulfilledRequirementWithResource
其中:分配资源allocateSlotsAccordingTo
其中:declareNeededResourcesWithDelay
FineGrainedSlotManager -> declareNeededResourcesWithDelay
FineGrainedSlotManager -> declareNeededResources
ActiveResourceManager -> declareResourceNeeded
ActiveResourceManager -> declareResourceNeeded
ActiveResourceManager -> checkResourceDeclarations
ActiveResourceManager -> requestNewWorker
YarnResourceManagerDriver -> requestResource
YarnResourceManagerDriver -> addContainerRequest
请求容器 进行回调函数YarnResourceManagerDriver ->onContainersAllocated
2. YarnResourceManagerDriver ->onContainersOfPriorityAllocated
YarnResourceManagerDriver -> startTaskExecutorInContainerAsync
YarnResourceManagerDriver -> createTaskExecutorLaunchContext
Utils.createTaskExecutorContext
所以 能够 启动TaskExecutorRunner main方法
TaskExecutorRunner -> runTaskManagerProcessSecurely
TaskExecutorRunner -> runTaskManagerProcessSecurely
TaskExecutorRunner -> runTaskManager
TaskExecutorToServiceAdapter -> start
TaskExecutor onstart方法:
TaskExecutor -> onStart
TaskExecutor -> startTaskExecutorServices
其中resourceManagerLeaderRetriever.start
其中:newRegistration.startRegistration(); // todo invokeRegistration
RetryingRegistration -> startRegistration
RetryingRegistration -> register
RetryingRegistration -> invokeOperation
TaskExecutorToResourceManagerConnection -> invokeRegistration
registerTaskExecutor registerTaskExecutorInternal
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
TaskExecutorToResourceManagerConnection -> onRegistrationSuccess
ResourceManagerRegistrationListener -> onRegistrationSuccess
ResourceManagerRegistrationListener -> establishResourceManagerConnection
ResourceManager -> sendSlotReport
FineGrainedSlotManager -> registerTaskManager 进行allocateSlot
DefaultSlotStatusSyncer -> allocateSlot
TaskExecutor(!!!) -> requestSlot
TaskExecutor -> requestSlot
其中 allocateSlotForJob:
其中:offerSlotsToJobManager
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。