赞
踩
JobManager 是 Flink 集群的主节点,它包含几大重要的组件:
1、ResourceManager
Flink的集群资源管理器,只有一个,关于slot的管理和申请等工作,都由他负责。
2、Dispatcher
负责接收用户提交的 JobGragh, 然后启动一个 JobMaster, 类似于 YARN 集群中的 AppMaster角色,类似于 Spark Job 中的 Driver 角色。
3、WebMonitorEndpoint
里面维护了很多很多的Handler,如果客户端通过 flink run 的方式来提交一个 job 到 flink集群,最终,是由 WebMonitorEndpoint 来接收,并且决定使用哪一个 Handler 来执行处理。
4、JobMaster/JobManager
负责一个具体的 Job 的执行,在一个集群中,可能会有多个 JobManager 同时执行,类似于 YARN集群中的AppMaster 角色,类似于 Spark Job 中的 Driver 角色。
1、如果我们将 FLink 是主从架构,那么这个 JobManager 就是指主节点,它包含上面讲述的三种角色
2、如果我们将 Job 提交到 YARN 运行的时候,事实上,可以通过启动一个小集群的方式来运行,这个小集群的主节点也是JobManager,你把job提交到 YARN 运行的时候,还有一种模式:job、sessioin, Container(JobManager)Container(StreamTask)
总之,Flink 集群的主节点内部运行着:ResourceManager 和Dispatcher,当 client 提交一个 job 到集群运行的时候(客户端会把该 Job 构建成一个 JobGragh 对象),Dispatcher 负责拉起JobManager/JobMaster 来负责这个 Job 内部的 Task 的执行,执行Task所需要的资源,JobManager 向 ResourceManager 申请。
根据上一篇的分析,JobManager的启动主类:StandaloneSessionClusterEntrypoint:
// 入口 StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); clusterEntrypoint.startCluster(); runCluster(configuration, pluginManager); // 第一步:初始化各种服务(7个服务) initializeServices(configuration, pluginManager); // 创建 DispatcherResourceManagerComponentFactory, 初始化各种组件的 工厂实例 // 其实内部包含了三个重要的成员变量: // 创建 ResourceManager 的工厂实例 // 创建 Dispatcher 的工厂实例 // 创建 WebMonitorEndpoint 的工厂实例 createDispatcherResourceManagerComponentFactory(configuration); // 创建 集群运行需要的一些组件:Dispatcher, ResourceManager 等 // 创建 ResourceManager // 创建 Dispatcher // 创建 WebMonitorEndpoint clusterComponent = dispatcherResourceManagerComponentFactory.create(...)
第一步 initializeServices() 中做了很多服务组件的初始化:
// 初始化和启动 AkkaRpcService,内部其实包装了一个 ActorSystem
commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...)
// 初始化一个负责 IO 的线程池
ioExecutor = Executors.newFixedThreadPool(...)
// 初始化 HA 服务组件,负责 HA 服务的是:ZooKeeperHaServices
haServices = createHaServices(configuration, ioExecutor);
// 初始化 BlobServer 服务端
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
// 初始化心跳服务组件, heartbeatServices = HeartbeatServices
heartbeatServices = createHeartbeatServices(configuration);
// 初始化一个用来存储 ExecutionGraph 的 Store, 实现是:
FileArchivedExecutionGraphStore
archivedExecutionGraphStore = createSerializableExecutionGraphStore(...)
第二步 createDispatcherResourceManagerComponentFactory(configuration) 中负责初始化了很多组件的工厂实例:
1、DispatcherRunnerFactory,默认实现:DefaultDispatcherRunnerFactory
2、ResourceManagerFactory,默认实现:StandaloneResourceManagerFactory
3、RestEndpointFactory,默认实现:SessionRestEndpointFactory
DispatcherRunnerFactory 内部也实例化了一个SessionDispatcherLeaderProcessFactoryFactory 组件。
创建三个工厂的代码:
final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
进入createDispatcherResourceManagerComponentFactory方法
protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
}
//调用下面的方法
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory) {
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFactory.INSTANCE),//创建SessionDispatcherFactory
resourceManagerFactory,//创建resourceManagerFactory
SessionRestEndpointFactory.INSTANCE);//创建SessionRestEndpointFactory
}
这样三个工厂就创建出来了
第三步 dispatcherResourceManagerComponentFactory.create(…) 中主要去创建 三个重要的组件:
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
进入create方法:
首先创建第一个组件并启动:webMonitorEndpoint 组件。
/** * 如果用户通过flink run提交了一个job,那么最后是由WebMonitorEndpoint中的jobSubmitHandler来处理,处理完成后, * 交给dispatcher处理。创建webMonitorEndpoint,如果时yarn模式,则创建的是MiniDispatcherRestEndpoint */ webMonitorEndpoint = restEndpointFactory.createRestEndpoint( configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();
具体创建WebMonitorEndpoint的代码:
@Override public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint( Configuration configuration, LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever, LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, TransientBlobService transientBlobService, ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception { final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration); return new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, restHandlerConfiguration, resourceManagerGatewayRetriever, transientBlobService, executor, metricFetcher, leaderElectionService, RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration), fatalErrorHandler); }
再来看webMonitorEndpoint.start();start方法封装在RestServerEndpoint中:webMonitorEndpoint继承自RestServerEndpoint,实际的启动方法start调用的是父类的方法:
/** * Starts this REST server endpoint. * * @throws Exception if we cannot start the RestServerEndpoint */ public final void start() throws Exception { synchronized (lock) { Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted."); log.info("Starting rest endpoint."); //初始化一个路由器 final Router router = new Router(); final CompletableFuture<String> restAddressFuture = new CompletableFuture<>(); //初始化各种handlers handlers = initializeHandlers(restAddressFuture); /* sort the handlers such that they are ordered the following: * /jobs * /jobs/overview * /jobs/:jobid * /jobs/:jobid/config * /:* */ Collections.sort( handlers, RestHandlerUrlComparator.INSTANCE); //检查唯一性 checkAllEndpointsAndHandlersAreUnique(handlers); handlers.forEach(handler -> registerHandler(router, handler, log));//注册handler到router里面 //netty例行程序 ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { RouterHandler handler = new RouterHandler(router, responseHeaders); // SSL should be the first handler in the pipeline if (isHttpsEnabled()) { ch.pipeline().addLast("ssl", new RedirectingSslHandler(restAddress, restAddressFuture, sslHandlerFactory)); } ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new FileUploadHandler(uploadDir)) .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders)) .addLast(new ChunkedWriteHandler()) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler(log, responseHeaders)); } }; NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss")); NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker")); bootstrap = new ServerBootstrap(); bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); Iterator<Integer> portsIterator; try { portsIterator = NetUtils.getPortRangeFromString(restBindPortRange); } catch (IllegalConfigurationException e) { throw e; } catch (Exception e) { throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange); } int chosenPort = 0; while (portsIterator.hasNext()) { try { chosenPort = portsIterator.next(); final ChannelFuture channel; if (restBindAddress == null) { channel = bootstrap.bind(chosenPort); } else { channel = bootstrap.bind(restBindAddress, chosenPort); } serverChannel = channel.syncUninterruptibly().channel(); break; } catch (final Exception e) { // continue if the exception is due to the port being in use, fail early otherwise if (!(e instanceof org.jboss.netty.channel.ChannelException || e instanceof java.net.BindException)) { throw e; } } } if (serverChannel == null) { throw new BindException("Could not start rest endpoint on any port in port range " + restBindPortRange); } log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort); final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress(); final String advertisedAddress; if (bindAddress.getAddress().isAnyLocalAddress()) { advertisedAddress = this.restAddress; } else { advertisedAddress = bindAddress.getAddress().getHostAddress(); } final int port = bindAddress.getPort(); log.info("Rest endpoint listening at {}:{}", advertisedAddress, port); restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString(); restAddressFuture.complete(restBaseUrl); //启动完成 state = State.RUNNING; //调用子类的方法,不同的实现会不一样 startInternal(); }
最后调用子类的startInternal();方法:该方法定义在父类自身,是个抽象方法,由子类去实现,父类的方法调用这个抽象方法:
@Override
public void startInternal() throws Exception {
//选举服务,让当前竞选者this参与选举
leaderElectionService.start(this);
//开启一个定时清理ExecutionGraphCache的任务
startExecutionGraphCacheCleanupTask();
if (hasWebUI) {
log.info("Web frontend listening at {}.", getRestBaseUrl());
}
}
这里,选举的目的是将server端的端口号写入到zk,因为server端的端口地址是随机的。选举有两方面作用:1是组件本身HA的需要,2是服务发现的需要,客户端需要到zk里获取服务端的地址。,进入这个方法:leaderElectionService.start(this);
@Override
public final void start(LeaderContender contender) throws Exception {
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock) {
leaderContender = contender;
leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver(
this, new LeaderElectionFatalErrorHandler(), leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
running = true;
}
}
进入createLeaderElectionDriver
@Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler,
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription) throws Exception {
return new ZooKeeperLeaderElectionDriver(
client, latchPath, leaderPath, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
}
返回一个ZooKeeperLeaderElectionDriver
public ZooKeeperLeaderElectionDriver( CuratorFramework client, String latchPath, String leaderPath, LeaderElectionEventHandler leaderElectionEventHandler, FatalErrorHandler fatalErrorHandler, String leaderContenderDescription) throws Exception { this.client = checkNotNull(client); this.leaderPath = checkNotNull(leaderPath); this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.leaderContenderDescription = checkNotNull(leaderContenderDescription); leaderLatch = new LeaderLatch(client, checkNotNull(latchPath)); cache = new NodeCache(client, leaderPath); client.getUnhandledErrorListenable().addListener(this); running = true; leaderLatch.addListener(this); leaderLatch.start(); cache.getListenable().addListener(this); cache.start(); client.getConnectionStateListenable().addListener(listener); }
这里的leaderLatch.start();就是选举,并通过异步监听选举结果的方式来实现选举
public void start() throws Exception {
Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");
this.startTask.set(AfterConnectionEstablished.execute(this.client, new Runnable() {
public void run() {
try {
LeaderLatch.this.internalStart();
} finally {
LeaderLatch.this.startTask.set((Object)null);
}
}
}));
}
leaderLatch.addListener(this);需要一个监听器参数,而每个参与选举的组件都要事先这个监听器:LeaderLatchListener:这里的this也是实现了监听器
public interface LeaderLatchListener {
void isLeader();
void notLeader();
}
curator框架会在选举结束后来调用LeaderLatchListener 实现类this的这两个方法。如果选举成功,则调用isLeader():
@Override
public void isLeader() {
leaderElectionEventHandler.onGrantLeadership();
}
leaderElectionEventHandler.onGrantLeadership();:
@Override @GuardedBy("lock") public void onGrantLeadership() { synchronized (lock) { if (running) { issuedLeaderSessionID = UUID.randomUUID(); clearConfirmedLeaderInformation(); if (LOG.isDebugEnabled()) { LOG.debug( "Grant leadership to contender {} with session ID {}.", leaderContender.getDescription(), issuedLeaderSessionID); } //将当前组件赋予领导者角色 leaderContender.grantLeadership(issuedLeaderSessionID); } else { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring the grant leadership notification since the {} has " + "already been closed.", leaderElectionDriver); } } } }
leaderContender.grantLeadership(issuedLeaderSessionID);
@Override
public void grantLeadership(final UUID leaderSessionID) {
log.info("{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID);
leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
}
leaderElectionService.confirmLeadership
@Override public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { if (LOG.isDebugEnabled()) { LOG.debug( "Confirm leader session ID {} for leader {}.", leaderSessionID, leaderAddress); } checkNotNull(leaderSessionID); synchronized (lock) { if (hasLeadership(leaderSessionID)) {//是否当前组件又leader权限 if (running) { confirmLeaderInformation(leaderSessionID, leaderAddress); } else { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring the leader session Id {} confirmation, since the " + "LeaderElectionService has already been stopped.", leaderSessionID); } } } else { // Received an old confirmation call if (!leaderSessionID.equals(this.issuedLeaderSessionID)) { if (LOG.isDebugEnabled()) { LOG.debug("Receive an old confirmation call of leader session ID {}, " + "current issued session ID is {}", leaderSessionID, issuedLeaderSessionID); } } else { LOG.warn("The leader session ID {} was confirmed even though the " + "corresponding JobManager was not elected as the leader.", leaderSessionID); } } } }
confirmLeaderInformation(leaderSessionID, leaderAddress);
@GuardedBy("lock")
private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
confirmedLeaderSessionID = leaderSessionID;
confirmedLeaderAddress = leaderAddress;
leaderElectionDriver.writeLeaderInformation(
LeaderInformation.known(confirmedLeaderSessionID, confirmedLeaderAddress));
}
也就是如果确认当前为leader,那么将当前信息写入zk。写zk大概逻辑代码:
try { final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeUTF(leaderInformation.getLeaderAddress()); oos.writeObject(leaderInformation.getLeaderSessionID()); oos.close(); boolean dataWritten = false; while (!dataWritten && leaderLatch.hasLeadership()) { Stat stat = client.checkExists().forPath(leaderPath); if (stat != null) { long owner = stat.getEphemeralOwner(); long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId(); if (owner == sessionID) { try { client.setData().forPath(leaderPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NoNodeException noNode) { // node was deleted in the meantime } } else { try { client.delete().forPath(leaderPath); } catch (KeeperException.NoNodeException noNode) { // node was deleted in the meantime --> try again } } } else { try { client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( leaderPath, baos.toByteArray()); dataWritten = true; } catch (KeeperException.NodeExistsException nodeExists) { // node has been created in the meantime --> try again } } } if (LOG.isDebugEnabled()) { LOG.debug("Successfully wrote leader information: {}.", leaderInformation); } }
到此,组件就将自身信息写到了zk,如果写入成功,就说明获取了leader身份,而且客户端也通过zk知道了服务端的地址信息。下面几个其他的组件的选举过程与这里都是类似的,流程基本一致。
然后创建第二个组件并启动:resourcemanager
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
//。。。 创建dispatcher组件
log.debug("Starting ResourceManager.");
resourceManager.start();//启动
这里start方法执行后,就会发送消息给自己,就会执行本身的onStart方法。
创建resourcemanager代码:
public ResourceManager<T> createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, MetricRegistry metricRegistry, String hostname, Executor ioExecutor) throws Exception { final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname); final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname); //1. final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices( configuration, rpcService, highAvailabilityServices, slotManagerMetricGroup); //2 return createResourceManager( configuration, resourceId, rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler, clusterInformation, webInterfaceUrl, resourceManagerMetricGroup, resourceManagerRuntimeServices, ioExecutor); }
这里有两个重要方法:
(1)创建ResourceManagerRuntimeServices
(2)createResourceManager
(1)createResourceManagerRuntimeServices:
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {
return ResourceManagerRuntimeServices.fromConfiguration(
createResourceManagerRuntimeServicesConfiguration(configuration),
highAvailabilityServices,
rpcService.getScheduledExecutor(),
slotManagerMetricGroup);
}
fromConfiguration:
public static ResourceManagerRuntimeServices fromConfiguration(
ResourceManagerRuntimeServicesConfiguration configuration,
HighAvailabilityServices highAvailabilityServices,
ScheduledExecutor scheduledExecutor,
SlotManagerMetricGroup slotManagerMetricGroup) {
//1.创建了一个SlotManager
final SlotManager slotManager = createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);
//创建了一个JobLeaderIdService
final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
scheduledExecutor,
configuration.getJobTimeout());
//返回封装了SlotManager 和JobLeaderIdService 的ResourceManagerRuntimeServices
return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}
所以,resourceManager主要是启动了两个服务:SlotManager 和JobLeaderIdService
(2)createResourceManager,具体实现类是StandaloneResourceManagerFactory
protected ResourceManager<ResourceID> createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup, ResourceManagerRuntimeServices resourceManagerRuntimeServices, Executor ioExecutor) { final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); return new StandaloneResourceManager( rpcService, resourceId, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), ResourceManagerPartitionTrackerImpl::new, resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, standaloneClusterStartupPeriodTime, AkkaUtils.getTimeoutAsTime(configuration), ioExecutor); }
创建了resourcemanger后,就启动了,调用start方法:resourceManager.start();
// ------------------------------------------------------------------------
// Start & shutdown & lifecycle callbacks
// ------------------------------------------------------------------------
/**
* Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
* to process remote procedure calls.
*/
public final void start() {
rpcServer.start();
}
RpcServer的start方法,这里是个框架相关的方法,会调用到子类AkkaInvocationHandler的start方法:
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
@Override
public void start() {
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}
这个rpcEndpoint是个ActorRef,rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());这里是发送给自己START消息,这里就会转到StandaloneResourceManager的onStart方法来执行。而实际上调用的是StandaloneResourceManager的父类ResourceManager的onStart方法:
@Override
public final void onStart() throws Exception {
try {
startResourceManagerServices();
} catch (Throwable t) {
final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
}
private void startResourceManagerServices() throws Exception {
try {
//选举服务,将自己信息写道zk中
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
initialize();
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
整个选举过程与上面的rest服务一样。最后会调用到:
public void onGrantLeadership() { synchronized (lock) { if (running) { issuedLeaderSessionID = UUID.randomUUID(); clearConfirmedLeaderInformation(); if (LOG.isDebugEnabled()) { LOG.debug( "Grant leadership to contender {} with session ID {}.", leaderContender.getDescription(), issuedLeaderSessionID); } //调用到这里 leaderContender.grantLeadership(issuedLeaderSessionID); } else { if (LOG.isDebugEnabled()) { LOG.debug("Ignoring the grant leadership notification since the {} has " + "already been closed.", leaderElectionDriver); } } } }
最后调用到Resourcemanager的grantLeadership方法:
@Override public void grantLeadership(final UUID newLeaderSessionID) { final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor()); final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync( (acceptLeadership) -> { if (acceptLeadership) { // confirming the leader session ID might be blocking, leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress()); } }, ioExecutor); confirmationFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError(ExceptionUtils.stripCompletionException(throwable)); } }); }
这里有两处比较重要的部分:首先看代码:
tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) { if (leaderElectionService.hasLeadership(newLeaderSessionID)) { final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); // clear the state if we've been the leader before if (getFencingToken() != null) { clearStateInternal(); } setFencingToken(newResourceManagerId); //成为leader的resourcemanager才能执行整个方法执行服务 startServicesOnLeadership(); return prepareLeadershipAsync().thenApply(ignored -> true); } else { return CompletableFuture.completedFuture(false); } }
startServicesOnLeadership();:成为leader的resourcemanager才能执行整个方法执行服务,这里启动两个心跳服务,两个定时服务。
private void startServicesOnLeadership() {
//启动心跳服务
startHeartbeatServices();
//启动slotManager,启动两个定时服务
//1.检查taskExecutor的死活状态 50s没有发送心跳过来
//2.检查slot的申请请求状态,slot申请超时时间如果没有返回则不要了
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
//
onLeadership();
}
1.启动心跳服务
private void startHeartbeatServices() {
//启动与TaskManager 的Heartbeat
taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
//启动与jobManager的Heartbeat
jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
createHeartbeatManagerSender在heartBeatService中定义的,还有类似的方法createHeartbeatManager,区别在于带sender的是发送消息的主动方。
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {
return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
resourceId,
heartbeatListener,
mainThreadExecutor,
log);
}
HeartbeatManagerSenderImpl( long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log) { this( heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, log, new HeartbeatMonitorImpl.Factory<>()); }
HeartbeatManagerSenderImpl( long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log, HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) { super( heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, log, heartbeatMonitorFactory); this.heartbeatPeriod = heartbeatPeriod; mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS); }
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);定时服务,调度当前线程。
@Override
public void run() {
if (!stopped) {
log.debug("Trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
requestHeartbeat(heartbeatMonitor);
}
//启动的时候调度一次,延迟heartbeatPeriod再次调度,每隔10s
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
HeartbeatMonitor就是taskExecutor。
接下来看slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
@Override public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info("Starting the SlotManager."); this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); started = true; //第一个定时任务 taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkTaskManagerTimeoutsAndRedundancy()), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); //第二个定时任务 slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); registerSlotManagerMetrics(); }
第一个定时任务检查那些taskmanager超时了:checkTaskManagerTimeoutsAndRedundancy,心跳间隔10s,任务检查30s,任务超时50s。
void checkTaskManagerTimeoutsAndRedundancy() { if (!taskManagerRegistrations.isEmpty()) { long currentTime = System.currentTimeMillis(); ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size()); // first retrieve the timed out TaskManagers for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) { if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) { // we collect the instance ids first in order to avoid concurrent modifications by the // ResourceActions.releaseResource call timedOutTaskManagers.add(taskManagerRegistration); } } int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size(); if (freeSlots.size() == slots.size()) { // No need to keep redundant taskManagers if no job is running. releaseTaskExecutors(timedOutTaskManagers, timedOutTaskManagers.size()); } else if (slotsDiff > 0) { // Keep enough redundant taskManagers from time to time. int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker); allocateRedundantTaskManagers(requiredTaskManagers); } else { // second we trigger the release resource callback which can decide upon the resource release int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker; releaseTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size())); } } }
第二个定时任务checkSlotRequestTimeouts检查那些slot请求任务超时了。5分钟超时。
private void checkSlotRequestTimeouts() { if (!pendingSlotRequests.isEmpty()) { long currentTime = System.currentTimeMillis(); Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator(); while (slotRequestIterator.hasNext()) { PendingSlotRequest slotRequest = slotRequestIterator.next().getValue(); if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) { slotRequestIterator.remove(); if (slotRequest.isAssigned()) { cancelPendingSlotRequest(slotRequest); } resourceActions.notifyAllocationFailure( slotRequest.getJobId(), slotRequest.getAllocationId(), new TimeoutException("The allocation could not be fulfilled in time.")); } } } }
resourcemanager的选举启动了两个心跳任务,两个定时任务。
//create方法内部会创建dispatcher并调用start方法启动
log.debug("Starting Dispatcher.");
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
1.创建工厂:dispatcherRunnerFactory.createDispatcherRunner方法
/** * 1.创建dispatcher * 2.启动dispatcher */ @Override public DispatcherRunner createDispatcherRunner( LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler, JobGraphStoreFactory jobGraphStoreFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices) throws Exception { // final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory( jobGraphStoreFactory, ioExecutor, rpcService, partialDispatcherServices, fatalErrorHandler); return DefaultDispatcherRunner.create( leaderElectionService, fatalErrorHandler, dispatcherLeaderProcessFactory); }
首先创建一个工厂
public DispatcherLeaderProcessFactory createFactory( JobGraphStoreFactory jobGraphStoreFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices, FatalErrorHandler fatalErrorHandler) { final AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory = new DefaultDispatcherGatewayServiceFactory( dispatcherFactory, rpcService, partialDispatcherServices); return new SessionDispatcherLeaderProcessFactory( dispatcherGatewayServiceFactory, jobGraphStoreFactory, ioExecutor, fatalErrorHandler); }
SessionDispatcherLeaderProcessFactory用来创建SessionDispatcherLeaderProcess的,整个工厂封装给了DispatcherLeaderProcessFactory
然后调用create方法:
public static DispatcherRunner create(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
//创建DefaultDispatcherRunner
final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(
leaderElectionService,
fatalErrorHandler,
dispatcherLeaderProcessFactory);
//开启DefaultDispatcherRunner的生命周期
return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService);
}
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService);
}
把选举服务给了这个方法作为参数:leaderElectionService
private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;
leaderElectionService.start(dispatcherRunner);
}
里面执行了选举服务:leaderElectionService.start(dispatcherRunner);
public final void start(LeaderContender contender) throws Exception {
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock) {
leaderContender = contender;
leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver(
this, new LeaderElectionFatalErrorHandler(), leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
running = true;
}
}
这里的leaderContender 是dispatcherRunner。再次来到ZooKeeperLeaderElectionDriver
public ZooKeeperLeaderElectionDriver( CuratorFramework client, String latchPath, String leaderPath, LeaderElectionEventHandler leaderElectionEventHandler, FatalErrorHandler fatalErrorHandler, String leaderContenderDescription) throws Exception { this.client = checkNotNull(client); this.leaderPath = checkNotNull(leaderPath); this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler); this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.leaderContenderDescription = checkNotNull(leaderContenderDescription); leaderLatch = new LeaderLatch(client, checkNotNull(latchPath)); cache = new NodeCache(client, leaderPath); client.getUnhandledErrorListenable().addListener(this); running = true; leaderLatch.addListener(this); leaderLatch.start(); cache.getListenable().addListener(this); cache.start(); client.getConnectionStateListenable().addListener(listener); }
选举成功回调ZooKeeperLeaderElectionDriver的isLeader方法:
@Override
public void isLeader() {
leaderElectionEventHandler.onGrantLeadership();
}
然后到DefaultDispatcherRunner的grantLeadership方法
@Override
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}
调startNewDispatcherLeaderProcess
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
//停掉旧的实例
stopDispatcherLeaderProcess();
//创建新的实例
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
//启动新的实例
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}
启动方法:newDispatcherLeaderProcess::start,调用的是AbstractDispatcherLeaderProcess类的start方法
@Override
public final void start() {
runIfStateIs(
State.CREATED,
this::startInternal);
}
private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
onStart();
}
调用的是SessionDispatcherLeaderProcess的onStart方法:
@Override
protected void onStart() {
startServices();
onGoingRecoveryOperation = recoverJobsAsync()//拿到所有的jobGraph对象
.thenAccept(this::createDispatcherIfRunning)//将每个job启动一个dispatcher跑起来
.handle(this::onErrorIfRunning);
}
1.startService:就是启动jobGraphStore,用来存储jobGraph
private void startServices() {
try {
jobGraphStore.start(this);
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format(
"Could not start %s when trying to start the %s.",
jobGraphStore.getClass().getSimpleName(),
getClass().getSimpleName()),
e);
}
}
2.recoverJobsAsync:恢复待执行的任务,异步的方式,拿到所有需要恢复的jobGraph,真正恢复需要调用后面的.thenAccept(this::createDispatcherIfRunning)
private CompletableFuture<Collection<JobGraph>> recoverJobsAsync() {
return CompletableFuture.supplyAsync(
this::recoverJobsIfRunning,
ioExecutor);
}
调用的recoverJobsIfRunning:
private Collection<JobGraph> recoverJobsIfRunning() {
return supplyUnsynchronizedIfRunning(this::recoverJobs).orElse(Collections.emptyList());
}
恢复job方法:recoverJobs
private Collection<JobGraph> recoverJobs() {
log.info("Recover all persisted job graphs.");
final Collection<JobID> jobIds = getJobIds();
final Collection<JobGraph> recoveredJobGraphs = new ArrayList<>();
for (JobID jobId : jobIds) {
recoveredJobGraphs.add(recoverJob(jobId));
}
log.info("Successfully recovered {} persisted job graphs.", recoveredJobGraphs.size());
return recoveredJobGraphs;
}
拿到所有的job,然后根据jobid去恢复。
private Collection<JobID> getJobIds() {
try {
return jobGraphStore.getJobIds();
} catch (Exception e) {
throw new FlinkRuntimeException(
"Could not retrieve job ids of persisted jobs.",
e);
}
}
recoverJob是真正恢复job的方法:
private JobGraph recoverJob(JobID jobId) {
log.info("Trying to recover job with job id {}.", jobId);
try {
return jobGraphStore.recoverJobGraph(jobId);
} catch (Exception e) {
throw new FlinkRuntimeException(
String.format("Could not recover job with job id %s.", jobId),
e);
}
}
实现类是DefaultJobGraphStore的recoverJobGraph,拿到所有的jobGraph
public JobGraph recoverJobGraph(JobID jobId) throws Exception { checkNotNull(jobId, "Job ID"); LOG.debug("Recovering job graph {} from {}.", jobId, jobGraphStateHandleStore); final String name = jobGraphStoreUtil.jobIDToName(jobId); synchronized (lock) { verifyIsRunning(); boolean success = false; RetrievableStateHandle<JobGraph> jobGraphRetrievableStateHandle; try { try { jobGraphRetrievableStateHandle = jobGraphStateHandleStore.getAndLock(name); } catch (StateHandleStore.NotExistException ignored) { success = true; return null; } catch (Exception e) { throw new FlinkException("Could not retrieve the submitted job graph state handle " + "for " + name + " from the submitted job graph store.", e); } JobGraph jobGraph; try { jobGraph = jobGraphRetrievableStateHandle.retrieveState(); } catch (ClassNotFoundException cnfe) { throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + name + ". This indicates that you are trying to recover from state written by an " + "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe); } catch (IOException ioe) { throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + name + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle " + "store.", ioe); } addedJobGraphs.add(jobGraph.getJobID()); LOG.info("Recovered {}.", jobGraph); success = true; return jobGraph; } finally { if (!success) { jobGraphStateHandleStore.release(name); } } } }
上面拿到了需要恢复的jobGraph,下面执行.thenAccept(this::createDispatcherIfRunning)
private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {
runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
}
每一个jobGraph都需要一个dispatcher去调度:
private void createDispatcher(Collection<JobGraph> jobGraphs) {
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
jobGraphs,
jobGraphStore);
completeDispatcherSetup(dispatcherService);
}
而启动jobGraph在dispatcherGatewayServiceFactory.create方法里面
private void createDispatcher(Collection<JobGraph> jobGraphs) {
final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
jobGraphs,
jobGraphStore);
completeDispatcherSetup(dispatcherService);
}
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create( DispatcherId fencingToken, Collection<JobGraph> recoveredJobs, JobGraphWriter jobGraphWriter) { final Dispatcher dispatcher; try { //返回的是一个StandaloneDispatcher dispatcher = dispatcherFactory.createDispatcher( rpcService, fencingToken, recoveredJobs, (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(), PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter)); } catch (Exception e) { throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e); } dispatcher.start(); return DefaultDispatcherGatewayService.from(dispatcher); }
整个流程总结一下:
onStart()
->recoverJobsAsync()(恢复所有的jobGraph)
->createDispatcherIfRunning()
-> runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
->createDispatcher(jobGraphs)(恢复jobGraph)
也就是对于每个jobGraph,都需要一个dispatcher去调度运行。
其中createDispatcher返回的是一个StandaloneDispatcher
@Override
public StandaloneDispatcher createDispatcher(
RpcService rpcService,
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
DispatcherBootstrapFactory dispatcherBootstrapFactory,
PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception {
// create the default dispatcher
return new StandaloneDispatcher(
rpcService,
fencingToken,
recoveredJobs,
dispatcherBootstrapFactory,
DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, DefaultJobManagerRunnerFactory.INSTANCE));
}
dispatcher的start又进入了rpcServer的start从而转到onStart方法。也就是StandaloneDispatcher 的onstart方法
public final void start() {
rpcServer.start();
}
调用的是父类Dispatcher 的onStart方法
@Override public void onStart() throws Exception { try { startDispatcherServices(); } catch (Throwable t) { final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), t); onFatalError(exception); throw exception; } startRecoveredJobs(); this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create( getSelfGateway(DispatcherGateway.class), this.getRpcService().getScheduledExecutor() , this::onFatalError); }
startRecoveredJobs();将任务调度执行起来。
private void startRecoveredJobs() {
for (JobGraph recoveredJob : recoveredJobs) {
runRecoveredJob(recoveredJob);
}
recoveredJobs.clear();
}
private void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
try {
runJob(recoveredJob, ExecutionType.RECOVERY);
} catch (Throwable throwable) {
onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable));
}
}
到这里job就运行起来了。运行的模式是ExecutionType.RECOVERY。
job恢复后调用completeDispatcherSetup方法:添加一些回调。
final void completeDispatcherSetup(DispatcherGatewayService dispatcherService) {
runIfStateIs(
State.RUNNING,
() -> completeDispatcherSetupInternal(dispatcherService));
}
private void completeDispatcherSetupInternal(DispatcherGatewayService createdDispatcherService) {
Preconditions.checkState(dispatcherService == null, "The DispatcherGatewayService can only be set once.");
dispatcherService = createdDispatcherService;
dispatcherGatewayFuture.complete(createdDispatcherService.getGateway());
FutureUtils.forward(createdDispatcherService.getShutDownFuture(), shutDownFuture);
handleUnexpectedDispatcherServiceTermination(createdDispatcherService);
}
到此,jobmanager的三个最重要的组件就启动完成了,三个组件分别是:
- 初始化一大堆 Handler 和 一个 Router,并且进行排序去重,之后,再把每个 Handler 注册到Router当中
- 启动一个 Netty 的服务端
- 启动内部服务:执行竞选!WebMonitorEndpoint 本身就是一个 LeaderContender 角色。如果竞选成功,则回调 isLeader() 方法
- 竞选成功,其实就只是把 WebMontiroEndpoint 的 address 以及跟 zookeeper的sessionID 写入到 znode 中
- 启动一个关于 ExecutionGraph 的 Cache 的定时清理任务
1、ResourceManager 是 RpcEndpoint 的子类,所以在构建 ResourceManager 对象完成之后,会调用 start() 方法来启动这个 RpcEndpoint,然后就调准到它的 onStart() 方法执行。
2、ResourceManager 是 LeaderContender 的子类,会通过 LeaderElectionService 参加竞选,如果竞选成功,则会回调 isLeader() 方法。
3、启动 ResourceManager 需要的一些服务:
两个心跳服务:
(1)ResourceManager 和 TaskExecutor 之间的心跳
(2)ResourceManager 和 JobMaster 之间的心跳
两个定时服务:
(1)checkTaskManagerTimeoutsAndRedundancy() 检查 TaskExecutor的超时
(2)checkSlotRequestTimeouts() 检查 SlotRequest 超时
1、启动 JobGraphStore 服务
2、从 JobGraphStrore 恢复执行 Job, 要启动 Dispatcher
到此为止,job manager启动完成!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。