当前位置:   article > 正文

005.flink源码分析-jobmanager的启动_flink jobmanager 启动

flink jobmanager 启动

jobmanager概览

JobManager 是 Flink 集群的主节点,它包含几大重要的组件:
1、ResourceManager
Flink的集群资源管理器,只有一个,关于slot的管理和申请等工作,都由他负责。
2、Dispatcher
负责接收用户提交的 JobGragh, 然后启动一个 JobMaster, 类似于 YARN 集群中的 AppMaster角色,类似于 Spark Job 中的 Driver 角色。
3、WebMonitorEndpoint
里面维护了很多很多的Handler,如果客户端通过 flink run 的方式来提交一个 job 到 flink集群,最终,是由 WebMonitorEndpoint 来接收,并且决定使用哪一个 Handler 来执行处理。
4、JobMaster/JobManager
负责一个具体的 Job 的执行,在一个集群中,可能会有多个 JobManager 同时执行,类似于 YARN集群中的AppMaster 角色,类似于 Spark Job 中的 Driver 角色。

关于 JobManager 的区分

1、如果我们将 FLink 是主从架构,那么这个 JobManager 就是指主节点,它包含上面讲述的三种角色
2、如果我们将 Job 提交到 YARN 运行的时候,事实上,可以通过启动一个小集群的方式来运行,这个小集群的主节点也是JobManager,你把job提交到 YARN 运行的时候,还有一种模式:job、sessioin, Container(JobManager)Container(StreamTask)

总之,Flink 集群的主节点内部运行着:ResourceManager 和Dispatcher,当 client 提交一个 job 到集群运行的时候(客户端会把该 Job 构建成一个 JobGragh 对象),Dispatcher 负责拉起JobManager/JobMaster 来负责这个 Job 内部的 Task 的执行,执行Task所需要的资源,JobManager 向 ResourceManager 申请。

根据上一篇的分析,JobManager的启动主类:StandaloneSessionClusterEntrypoint:

// 入口
StandaloneSessionClusterEntrypoint.main()
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
clusterEntrypoint.startCluster();
runCluster(configuration, pluginManager);
// 第一步:初始化各种服务(7个服务)
initializeServices(configuration, pluginManager);
// 创建 DispatcherResourceManagerComponentFactory, 初始化各种组件的
工厂实例
// 其实内部包含了三个重要的成员变量:
// 创建 ResourceManager 的工厂实例
// 创建 Dispatcher 的工厂实例
// 创建 WebMonitorEndpoint 的工厂实例
createDispatcherResourceManagerComponentFactory(configuration);
// 创建 集群运行需要的一些组件:Dispatcher, ResourceManager 等
// 创建 ResourceManager
// 创建 Dispatcher
// 创建 WebMonitorEndpoint
clusterComponent =
dispatcherResourceManagerComponentFactory.create(...)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

第一步 initializeServices() 中做了很多服务组件的初始化:

// 初始化和启动 AkkaRpcService,内部其实包装了一个 ActorSystem
commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...)
// 初始化一个负责 IO 的线程池
ioExecutor = Executors.newFixedThreadPool(...)
// 初始化 HA 服务组件,负责 HA 服务的是:ZooKeeperHaServices
haServices = createHaServices(configuration, ioExecutor);
// 初始化 BlobServer 服务端
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
// 初始化心跳服务组件, heartbeatServices = HeartbeatServices
heartbeatServices = createHeartbeatServices(configuration);
// 初始化一个用来存储 ExecutionGraph 的 Store, 实现是:
FileArchivedExecutionGraphStore
archivedExecutionGraphStore = createSerializableExecutionGraphStore(...)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

第二步 createDispatcherResourceManagerComponentFactory(configuration) 中负责初始化了很多组件的工厂实例:

1、DispatcherRunnerFactory,默认实现:DefaultDispatcherRunnerFactory
2、ResourceManagerFactory,默认实现:StandaloneResourceManagerFactory
3、RestEndpointFactory,默认实现:SessionRestEndpointFactory

DispatcherRunnerFactory 内部也实例化了一个SessionDispatcherLeaderProcessFactoryFactory 组件。
创建三个工厂的代码:

final DispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
  • 1

进入createDispatcherResourceManagerComponentFactory方法

protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) {
   return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
}
//调用下面的方法
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
      ResourceManagerFactory<?> resourceManagerFactory) {
   return new DefaultDispatcherResourceManagerComponentFactory(
      DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFactory.INSTANCE),//创建SessionDispatcherFactory
      resourceManagerFactory,//创建resourceManagerFactory
      SessionRestEndpointFactory.INSTANCE);//创建SessionRestEndpointFactory
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这样三个工厂就创建出来了
第三步 dispatcherResourceManagerComponentFactory.create(…) 中主要去创建 三个重要的组件:

clusterComponent = dispatcherResourceManagerComponentFactory.create(
   configuration,
   ioExecutor,
   commonRpcService,
   haServices,
   blobServer,
   heartbeatServices,
   metricRegistry,
   archivedExecutionGraphStore,
   new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
   this);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

进入create方法:

webMonitorEndpoint的创建

首先创建第一个组件并启动:webMonitorEndpoint 组件。

/**
 * 如果用户通过flink run提交了一个job,那么最后是由WebMonitorEndpoint中的jobSubmitHandler来处理,处理完成后,
 * 交给dispatcher处理。创建webMonitorEndpoint,如果时yarn模式,则创建的是MiniDispatcherRestEndpoint
 */
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
   configuration,
   dispatcherGatewayRetriever,
   resourceManagerGatewayRetriever,
   blobServer,
   executor,
   metricFetcher,
   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
   fatalErrorHandler);

log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

具体创建WebMonitorEndpoint的代码:

@Override
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
      Configuration configuration,
      LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
      LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
      TransientBlobService transientBlobService,
      ScheduledExecutorService executor,
      MetricFetcher metricFetcher,
      LeaderElectionService leaderElectionService,
      FatalErrorHandler fatalErrorHandler) throws Exception {
   final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);

   return new DispatcherRestEndpoint(
      RestServerEndpointConfiguration.fromConfiguration(configuration),
      dispatcherGatewayRetriever,
      configuration,
      restHandlerConfiguration,
      resourceManagerGatewayRetriever,
      transientBlobService,
      executor,
      metricFetcher,
      leaderElectionService,
      RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
      fatalErrorHandler);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

再来看webMonitorEndpoint.start();start方法封装在RestServerEndpoint中:webMonitorEndpoint继承自RestServerEndpoint,实际的启动方法start调用的是父类的方法:

/**
 * Starts this REST server endpoint.
 *
 * @throws Exception if we cannot start the RestServerEndpoint
 */
public final void start() throws Exception {
   synchronized (lock) {
      Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");

      log.info("Starting rest endpoint.");
      //初始化一个路由器
      final Router router = new Router();
      final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
      //初始化各种handlers
      handlers = initializeHandlers(restAddressFuture);

      /* sort the handlers such that they are ordered the following:
       * /jobs
       * /jobs/overview
       * /jobs/:jobid
       * /jobs/:jobid/config
       * /:*
       */
      Collections.sort(
         handlers,
         RestHandlerUrlComparator.INSTANCE);
      //检查唯一性
      checkAllEndpointsAndHandlersAreUnique(handlers);
      handlers.forEach(handler -> registerHandler(router, handler, log));//注册handler到router里面
      //netty例行程序
      ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {

         @Override
         protected void initChannel(SocketChannel ch) {
            RouterHandler handler = new RouterHandler(router, responseHeaders);

            // SSL should be the first handler in the pipeline
            if (isHttpsEnabled()) {
               ch.pipeline().addLast("ssl",
                  new RedirectingSslHandler(restAddress, restAddressFuture, sslHandlerFactory));
            }

            ch.pipeline()
               .addLast(new HttpServerCodec())
               .addLast(new FileUploadHandler(uploadDir))
               .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders))
               .addLast(new ChunkedWriteHandler())
               .addLast(handler.getName(), handler)
               .addLast(new PipelineErrorHandler(log, responseHeaders));
         }
      };

      NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
      NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));

      bootstrap = new ServerBootstrap();
      bootstrap
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(initializer);

      Iterator<Integer> portsIterator;
      try {
         portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
      } catch (IllegalConfigurationException e) {
         throw e;
      } catch (Exception e) {
         throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange);
      }

      int chosenPort = 0;
      while (portsIterator.hasNext()) {
         try {
            chosenPort = portsIterator.next();
            final ChannelFuture channel;
            if (restBindAddress == null) {
               channel = bootstrap.bind(chosenPort);
            } else {
               channel = bootstrap.bind(restBindAddress, chosenPort);
            }
            serverChannel = channel.syncUninterruptibly().channel();
            break;
         } catch (final Exception e) {
            // continue if the exception is due to the port being in use, fail early otherwise
            if (!(e instanceof org.jboss.netty.channel.ChannelException || e instanceof java.net.BindException)) {
               throw e;
            }
         }
      }

      if (serverChannel == null) {
         throw new BindException("Could not start rest endpoint on any port in port range " + restBindPortRange);
      }

      log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);

      final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
      final String advertisedAddress;
      if (bindAddress.getAddress().isAnyLocalAddress()) {
         advertisedAddress = this.restAddress;
      } else {
         advertisedAddress = bindAddress.getAddress().getHostAddress();
      }
      final int port = bindAddress.getPort();

      log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);

      restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();

      restAddressFuture.complete(restBaseUrl);
      //启动完成
      state = State.RUNNING;
      //调用子类的方法,不同的实现会不一样
      startInternal();
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115

最后调用子类的startInternal();方法:该方法定义在父类自身,是个抽象方法,由子类去实现,父类的方法调用这个抽象方法:

@Override
public void startInternal() throws Exception {
    //选举服务,让当前竞选者this参与选举
   leaderElectionService.start(this);
   //开启一个定时清理ExecutionGraphCache的任务
   startExecutionGraphCacheCleanupTask();

   if (hasWebUI) {
      log.info("Web frontend listening at {}.", getRestBaseUrl());
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这里,选举的目的是将server端的端口号写入到zk,因为server端的端口地址是随机的。选举有两方面作用:1是组件本身HA的需要,2是服务发现的需要,客户端需要到zk里获取服务端的地址。,进入这个方法:leaderElectionService.start(this);

@Override
public final void start(LeaderContender contender) throws Exception {
   checkNotNull(contender, "Contender must not be null.");
   Preconditions.checkState(leaderContender == null, "Contender was already set.");

   synchronized (lock) {
      leaderContender = contender;
      leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver(
         this, new LeaderElectionFatalErrorHandler(), leaderContender.getDescription());
      LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);

      running = true;
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

进入createLeaderElectionDriver

@Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
      LeaderElectionEventHandler leaderEventHandler,
      FatalErrorHandler fatalErrorHandler,
      String leaderContenderDescription) throws Exception {
   return new ZooKeeperLeaderElectionDriver(
      client, latchPath, leaderPath, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

返回一个ZooKeeperLeaderElectionDriver

public ZooKeeperLeaderElectionDriver(
      CuratorFramework client,
      String latchPath,
      String leaderPath,
      LeaderElectionEventHandler leaderElectionEventHandler,
      FatalErrorHandler fatalErrorHandler,
      String leaderContenderDescription) throws Exception {
   this.client = checkNotNull(client);
   this.leaderPath = checkNotNull(leaderPath);
   this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
   this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
   this.leaderContenderDescription = checkNotNull(leaderContenderDescription);

   leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
   cache = new NodeCache(client, leaderPath);

   client.getUnhandledErrorListenable().addListener(this);

   running = true;

   leaderLatch.addListener(this);
   leaderLatch.start();

   cache.getListenable().addListener(this);
   cache.start();

   client.getConnectionStateListenable().addListener(listener);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

这里的leaderLatch.start();就是选举,并通过异步监听选举结果的方式来实现选举

public void start() throws Exception {
    Preconditions.checkState(this.state.compareAndSet(LeaderLatch.State.LATENT, LeaderLatch.State.STARTED), "Cannot be started more than once");
    this.startTask.set(AfterConnectionEstablished.execute(this.client, new Runnable() {
        public void run() {
            try {
                LeaderLatch.this.internalStart();
            } finally {
                LeaderLatch.this.startTask.set((Object)null);
            }

        }
    }));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

leaderLatch.addListener(this);需要一个监听器参数,而每个参与选举的组件都要事先这个监听器:LeaderLatchListener:这里的this也是实现了监听器

public interface LeaderLatchListener {
    void isLeader();

    void notLeader();
}
  • 1
  • 2
  • 3
  • 4
  • 5

curator框架会在选举结束后来调用LeaderLatchListener 实现类this的这两个方法。如果选举成功,则调用isLeader():

@Override
public void isLeader() {
   leaderElectionEventHandler.onGrantLeadership();
}
  • 1
  • 2
  • 3
  • 4

leaderElectionEventHandler.onGrantLeadership();:

@Override
@GuardedBy("lock")
public void onGrantLeadership() {
   synchronized (lock) {
      if (running) {
         issuedLeaderSessionID = UUID.randomUUID();
         clearConfirmedLeaderInformation();

         if (LOG.isDebugEnabled()) {
            LOG.debug(
               "Grant leadership to contender {} with session ID {}.",
               leaderContender.getDescription(),
               issuedLeaderSessionID);
         }
            //将当前组件赋予领导者角色
         leaderContender.grantLeadership(issuedLeaderSessionID);
      } else {
         if (LOG.isDebugEnabled()) {
            LOG.debug("Ignoring the grant leadership notification since the {} has " +
               "already been closed.", leaderElectionDriver);
         }
      }
   }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

leaderContender.grantLeadership(issuedLeaderSessionID);

@Override
public void grantLeadership(final UUID leaderSessionID) {
   log.info("{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID);
   leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
}
  • 1
  • 2
  • 3
  • 4
  • 5

leaderElectionService.confirmLeadership

@Override
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
   if (LOG.isDebugEnabled()) {
      LOG.debug(
         "Confirm leader session ID {} for leader {}.",
         leaderSessionID,
         leaderAddress);
   }

   checkNotNull(leaderSessionID);

   synchronized (lock) {
      if (hasLeadership(leaderSessionID)) {//是否当前组件又leader权限
         if (running) {
            confirmLeaderInformation(leaderSessionID, leaderAddress);
         } else {
            if (LOG.isDebugEnabled()) {
               LOG.debug("Ignoring the leader session Id {} confirmation, since the " +
                  "LeaderElectionService has already been stopped.", leaderSessionID);
            }
         }
      } else {
         // Received an old confirmation call
         if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
            if (LOG.isDebugEnabled()) {
               LOG.debug("Receive an old confirmation call of leader session ID {}, " +
                  "current issued session ID is {}", leaderSessionID, issuedLeaderSessionID);
            }
         } else {
            LOG.warn("The leader session ID {} was confirmed even though the " +
               "corresponding JobManager was not elected as the leader.", leaderSessionID);
         }
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

confirmLeaderInformation(leaderSessionID, leaderAddress);

@GuardedBy("lock")
private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
   confirmedLeaderSessionID = leaderSessionID;
   confirmedLeaderAddress = leaderAddress;
   leaderElectionDriver.writeLeaderInformation(
      LeaderInformation.known(confirmedLeaderSessionID, confirmedLeaderAddress));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

也就是如果确认当前为leader,那么将当前信息写入zk。写zk大概逻辑代码:

try {
   final ByteArrayOutputStream baos = new ByteArrayOutputStream();
   final ObjectOutputStream oos = new ObjectOutputStream(baos);

   oos.writeUTF(leaderInformation.getLeaderAddress());
   oos.writeObject(leaderInformation.getLeaderSessionID());

   oos.close();

   boolean dataWritten = false;

   while (!dataWritten && leaderLatch.hasLeadership()) {
      Stat stat = client.checkExists().forPath(leaderPath);

      if (stat != null) {
         long owner = stat.getEphemeralOwner();
         long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId();

         if (owner == sessionID) {
            try {
               client.setData().forPath(leaderPath, baos.toByteArray());

               dataWritten = true;
            } catch (KeeperException.NoNodeException noNode) {
               // node was deleted in the meantime
            }
         } else {
            try {
               client.delete().forPath(leaderPath);
            } catch (KeeperException.NoNodeException noNode) {
               // node was deleted in the meantime --> try again
            }
         }
      } else {
         try {
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
                  leaderPath,
                  baos.toByteArray());

            dataWritten = true;
         } catch (KeeperException.NodeExistsException nodeExists) {
            // node has been created in the meantime --> try again
         }
      }
   }

   if (LOG.isDebugEnabled()) {
      LOG.debug("Successfully wrote leader information: {}.", leaderInformation);
   }
} 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

到此,组件就将自身信息写到了zk,如果写入成功,就说明获取了leader身份,而且客户端也通过zk知道了服务端的地址信息。下面几个其他的组件的选举过程与这里都是类似的,流程基本一致。

resourcemanager的创建

然后创建第二个组件并启动:resourcemanager

resourceManager = resourceManagerFactory.createResourceManager(
   configuration,
   ResourceID.generate(),
   rpcService,
   highAvailabilityServices,
   heartbeatServices,
   fatalErrorHandler,
   new ClusterInformation(hostname, blobServer.getPort()),
   webMonitorEndpoint.getRestBaseUrl(),
   metricRegistry,
   hostname,
   ioExecutor);
   //。。。  创建dispatcher组件
   log.debug("Starting ResourceManager.");
    resourceManager.start();//启动
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

这里start方法执行后,就会发送消息给自己,就会执行本身的onStart方法。
创建resourcemanager代码:

public ResourceManager<T> createResourceManager(
      Configuration configuration,
      ResourceID resourceId,
      RpcService rpcService,
      HighAvailabilityServices highAvailabilityServices,
      HeartbeatServices heartbeatServices,
      FatalErrorHandler fatalErrorHandler,
      ClusterInformation clusterInformation,
      @Nullable String webInterfaceUrl,
      MetricRegistry metricRegistry,
      String hostname,
      Executor ioExecutor) throws Exception {

   final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname);
   final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname);
    //1.
   final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(
      configuration, rpcService, highAvailabilityServices, slotManagerMetricGroup);
//2
   return createResourceManager(
      configuration,
      resourceId,
      rpcService,
      highAvailabilityServices,
      heartbeatServices,
      fatalErrorHandler,
      clusterInformation,
      webInterfaceUrl,
      resourceManagerMetricGroup,
      resourceManagerRuntimeServices,
      ioExecutor);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

这里有两个重要方法:

(1)创建ResourceManagerRuntimeServices
(2)createResourceManager

(1)createResourceManagerRuntimeServices:

private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
      Configuration configuration,
      RpcService rpcService,
      HighAvailabilityServices highAvailabilityServices,
      SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {

   return ResourceManagerRuntimeServices.fromConfiguration(
      createResourceManagerRuntimeServicesConfiguration(configuration),
      highAvailabilityServices,
      rpcService.getScheduledExecutor(),
      slotManagerMetricGroup);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

fromConfiguration:

public static ResourceManagerRuntimeServices fromConfiguration(
      ResourceManagerRuntimeServicesConfiguration configuration,
      HighAvailabilityServices highAvailabilityServices,
      ScheduledExecutor scheduledExecutor,
      SlotManagerMetricGroup slotManagerMetricGroup) {
   //1.创建了一个SlotManager 
   final SlotManager slotManager = createSlotManager(configuration, scheduledExecutor, slotManagerMetricGroup);
    //创建了一个JobLeaderIdService 
   final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
      highAvailabilityServices,
      scheduledExecutor,
      configuration.getJobTimeout());
  //返回封装了SlotManager 和JobLeaderIdService 的ResourceManagerRuntimeServices
   return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

所以,resourceManager主要是启动了两个服务:SlotManager 和JobLeaderIdService
(2)createResourceManager,具体实现类是StandaloneResourceManagerFactory

protected ResourceManager<ResourceID> createResourceManager(
   Configuration configuration,
   ResourceID resourceId,
   RpcService rpcService,
   HighAvailabilityServices highAvailabilityServices,
   HeartbeatServices heartbeatServices,
   FatalErrorHandler fatalErrorHandler,
   ClusterInformation clusterInformation,
   @Nullable String webInterfaceUrl,
   ResourceManagerMetricGroup resourceManagerMetricGroup,
   ResourceManagerRuntimeServices resourceManagerRuntimeServices,
   Executor ioExecutor) {

   final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);

   return new StandaloneResourceManager(
      rpcService,
      resourceId,
      highAvailabilityServices,
      heartbeatServices,
      resourceManagerRuntimeServices.getSlotManager(),
      ResourceManagerPartitionTrackerImpl::new,
      resourceManagerRuntimeServices.getJobLeaderIdService(),
      clusterInformation,
      fatalErrorHandler,
      resourceManagerMetricGroup,
      standaloneClusterStartupPeriodTime,
      AkkaUtils.getTimeoutAsTime(configuration),
      ioExecutor);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

创建了resourcemanger后,就启动了,调用start方法:resourceManager.start();

// ------------------------------------------------------------------------
//  Start & shutdown & lifecycle callbacks
// ------------------------------------------------------------------------

/**
 * Triggers start of the rpc endpoint. This tells the underlying rpc server that the rpc endpoint is ready
 * to process remote procedure calls.
 */
public final void start() {
   rpcServer.start();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

RpcServer的start方法,这里是个框架相关的方法,会调用到子类AkkaInvocationHandler的start方法:

class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer
  • 1
@Override
public void start() {
   rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}
  • 1
  • 2
  • 3
  • 4

这个rpcEndpoint是个ActorRef,rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());这里是发送给自己START消息,这里就会转到StandaloneResourceManager的onStart方法来执行。而实际上调用的是StandaloneResourceManager的父类ResourceManager的onStart方法:

@Override
public final void onStart() throws Exception {
   try {
      startResourceManagerServices();
   } catch (Throwable t) {
      final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), t);
      onFatalError(exception);
      throw exception;
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
private void startResourceManagerServices() throws Exception {
   try {
       //选举服务,将自己信息写道zk中
      leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();

      initialize();

      leaderElectionService.start(this);
      jobLeaderIdService.start(new JobLeaderIdActionsImpl());

      registerTaskExecutorMetrics();
   } catch (Exception e) {
      handleStartResourceManagerServicesException(e);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

整个选举过程与上面的rest服务一样。最后会调用到:

public void onGrantLeadership() {
   synchronized (lock) {
      if (running) {
         issuedLeaderSessionID = UUID.randomUUID();
         clearConfirmedLeaderInformation();

         if (LOG.isDebugEnabled()) {
            LOG.debug(
               "Grant leadership to contender {} with session ID {}.",
               leaderContender.getDescription(),
               issuedLeaderSessionID);
         }
         //调用到这里
         leaderContender.grantLeadership(issuedLeaderSessionID);
      } else {
         if (LOG.isDebugEnabled()) {
            LOG.debug("Ignoring the grant leadership notification since the {} has " +
               "already been closed.", leaderElectionDriver);
         }
      }
   }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

最后调用到Resourcemanager的grantLeadership方法:

@Override
public void grantLeadership(final UUID newLeaderSessionID) {
   final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
      .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());

   final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync(
      (acceptLeadership) -> {
         if (acceptLeadership) {
            // confirming the leader session ID might be blocking,
            leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
         }
      },
      ioExecutor);

   confirmationFuture.whenComplete(
      (Void ignored, Throwable throwable) -> {
         if (throwable != null) {
            onFatalError(ExceptionUtils.stripCompletionException(throwable));
         }
      });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

这里有两处比较重要的部分:首先看代码:
tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());

private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {
   if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
      final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);

      log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);

      // clear the state if we've been the leader before
      if (getFencingToken() != null) {
         clearStateInternal();
      }

      setFencingToken(newResourceManagerId);
      //成为leader的resourcemanager才能执行整个方法执行服务
      startServicesOnLeadership();

      return prepareLeadershipAsync().thenApply(ignored -> true);
   } else {
      return CompletableFuture.completedFuture(false);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

startServicesOnLeadership();:成为leader的resourcemanager才能执行整个方法执行服务,这里启动两个心跳服务,两个定时服务。

private void startServicesOnLeadership() {
   //启动心跳服务
   startHeartbeatServices();
   //启动slotManager,启动两个定时服务
   //1.检查taskExecutor的死活状态 50s没有发送心跳过来
   //2.检查slot的申请请求状态,slot申请超时时间如果没有返回则不要了
   slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
   //
   onLeadership();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

1.启动心跳服务

private void startHeartbeatServices() {
   //启动与TaskManager 的Heartbeat
   taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
      resourceId,
      new TaskManagerHeartbeatListener(),
      getMainThreadExecutor(),
      log);
   //启动与jobManager的Heartbeat
   jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
      resourceId,
      new JobManagerHeartbeatListener(),
      getMainThreadExecutor(),
      log);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

createHeartbeatManagerSender在heartBeatService中定义的,还有类似的方法createHeartbeatManager,区别在于带sender的是发送消息的主动方。

public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
   ResourceID resourceId,
   HeartbeatListener<I, O> heartbeatListener,
   ScheduledExecutor mainThreadExecutor,
   Logger log) {

   return new HeartbeatManagerSenderImpl<>(
      heartbeatInterval,
      heartbeatTimeout,
      resourceId,
      heartbeatListener,
      mainThreadExecutor,
      log);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
HeartbeatManagerSenderImpl(
      long heartbeatPeriod,
      long heartbeatTimeout,
      ResourceID ownResourceID,
      HeartbeatListener<I, O> heartbeatListener,
      ScheduledExecutor mainThreadExecutor,
      Logger log) {
   this(
      heartbeatPeriod,
      heartbeatTimeout,
      ownResourceID,
      heartbeatListener,
      mainThreadExecutor,
      log,
      new HeartbeatMonitorImpl.Factory<>());
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
HeartbeatManagerSenderImpl(
      long heartbeatPeriod,
      long heartbeatTimeout,
      ResourceID ownResourceID,
      HeartbeatListener<I, O> heartbeatListener,
      ScheduledExecutor mainThreadExecutor,
      Logger log,
      HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
   super(
      heartbeatTimeout,
      ownResourceID,
      heartbeatListener,
      mainThreadExecutor,
      log,
      heartbeatMonitorFactory);

   this.heartbeatPeriod = heartbeatPeriod;
   mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);定时服务,调度当前线程。

@Override
public void run() {
   if (!stopped) {
      log.debug("Trigger heartbeat request.");
      for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
         requestHeartbeat(heartbeatMonitor);
      }
           //启动的时候调度一次,延迟heartbeatPeriod再次调度,每隔10s
      getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

HeartbeatMonitor就是taskExecutor。

接下来看slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());

@Override
public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
   LOG.info("Starting the SlotManager.");

   this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
   mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
   resourceActions = Preconditions.checkNotNull(newResourceActions);

   started = true;
        //第一个定时任务
   taskManagerTimeoutsAndRedundancyCheck = scheduledExecutor.scheduleWithFixedDelay(
      () -> mainThreadExecutor.execute(
         () -> checkTaskManagerTimeoutsAndRedundancy()),
      0L,
      taskManagerTimeout.toMilliseconds(),
      TimeUnit.MILLISECONDS);
//第二个定时任务
   slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
      () -> mainThreadExecutor.execute(
         () -> checkSlotRequestTimeouts()),
      0L,
      slotRequestTimeout.toMilliseconds(),
      TimeUnit.MILLISECONDS);

   registerSlotManagerMetrics();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

第一个定时任务检查那些taskmanager超时了:checkTaskManagerTimeoutsAndRedundancy,心跳间隔10s,任务检查30s,任务超时50s。

void checkTaskManagerTimeoutsAndRedundancy() {
   if (!taskManagerRegistrations.isEmpty()) {
      long currentTime = System.currentTimeMillis();

      ArrayList<TaskManagerRegistration> timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());

      // first retrieve the timed out TaskManagers
      for (TaskManagerRegistration taskManagerRegistration : taskManagerRegistrations.values()) {
         if (currentTime - taskManagerRegistration.getIdleSince() >= taskManagerTimeout.toMilliseconds()) {
            // we collect the instance ids first in order to avoid concurrent modifications by the
            // ResourceActions.releaseResource call
            timedOutTaskManagers.add(taskManagerRegistration);
         }
      }

      int slotsDiff = redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
      if (freeSlots.size() == slots.size()) {
         // No need to keep redundant taskManagers if no job is running.
         releaseTaskExecutors(timedOutTaskManagers, timedOutTaskManagers.size());
      } else if (slotsDiff > 0) {
         // Keep enough redundant taskManagers from time to time.
         int requiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
         allocateRedundantTaskManagers(requiredTaskManagers);
      } else {
         // second we trigger the release resource callback which can decide upon the resource release
         int maxReleaseNum = (-slotsDiff) / numSlotsPerWorker;
         releaseTaskExecutors(timedOutTaskManagers, Math.min(maxReleaseNum, timedOutTaskManagers.size()));
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

第二个定时任务checkSlotRequestTimeouts检查那些slot请求任务超时了。5分钟超时。

private void checkSlotRequestTimeouts() {
   if (!pendingSlotRequests.isEmpty()) {
      long currentTime = System.currentTimeMillis();

      Iterator<Map.Entry<AllocationID, PendingSlotRequest>> slotRequestIterator = pendingSlotRequests.entrySet().iterator();

      while (slotRequestIterator.hasNext()) {
         PendingSlotRequest slotRequest = slotRequestIterator.next().getValue();

         if (currentTime - slotRequest.getCreationTimestamp() >= slotRequestTimeout.toMilliseconds()) {
            slotRequestIterator.remove();

            if (slotRequest.isAssigned()) {
               cancelPendingSlotRequest(slotRequest);
            }

            resourceActions.notifyAllocationFailure(
               slotRequest.getJobId(),
               slotRequest.getAllocationId(),
               new TimeoutException("The allocation could not be fulfilled in time."));
         }
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
resourcemanager启动总结

resourcemanager的选举启动了两个心跳任务,两个定时任务。

dispatcher的创建并启动

//create方法内部会创建dispatcher并调用start方法启动
log.debug("Starting Dispatcher.");
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
   highAvailabilityServices.getDispatcherLeaderElectionService(),
   fatalErrorHandler,
   new HaServicesJobGraphStoreFactory(highAvailabilityServices),
   ioExecutor,
   rpcService,
   partialDispatcherServices);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

1.创建工厂:dispatcherRunnerFactory.createDispatcherRunner方法

  /**
* 1.创建dispatcher
* 2.启动dispatcher
*/
    @Override
   public DispatcherRunner createDispatcherRunner(
         LeaderElectionService leaderElectionService,
         FatalErrorHandler fatalErrorHandler,
         JobGraphStoreFactory jobGraphStoreFactory,
         Executor ioExecutor,
         RpcService rpcService,
         PartialDispatcherServices partialDispatcherServices) throws Exception {
//
      final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory = dispatcherLeaderProcessFactoryFactory.createFactory(
         jobGraphStoreFactory,
         ioExecutor,
         rpcService,
         partialDispatcherServices,
         fatalErrorHandler);

      return DefaultDispatcherRunner.create(
         leaderElectionService,
         fatalErrorHandler,
         dispatcherLeaderProcessFactory);
   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

首先创建一个工厂

public DispatcherLeaderProcessFactory createFactory(
      JobGraphStoreFactory jobGraphStoreFactory,
      Executor ioExecutor,
      RpcService rpcService,
      PartialDispatcherServices partialDispatcherServices,
      FatalErrorHandler fatalErrorHandler) {
   final AbstractDispatcherLeaderProcess.DispatcherGatewayServiceFactory dispatcherGatewayServiceFactory = new DefaultDispatcherGatewayServiceFactory(
      dispatcherFactory,
      rpcService,
      partialDispatcherServices);

   return new SessionDispatcherLeaderProcessFactory(
      dispatcherGatewayServiceFactory,
      jobGraphStoreFactory,
      ioExecutor,
      fatalErrorHandler);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

SessionDispatcherLeaderProcessFactory用来创建SessionDispatcherLeaderProcess的,整个工厂封装给了DispatcherLeaderProcessFactory
然后调用create方法:

public static DispatcherRunner create(
      LeaderElectionService leaderElectionService,
      FatalErrorHandler fatalErrorHandler,
      DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
   //创建DefaultDispatcherRunner
   final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(
      leaderElectionService,
      fatalErrorHandler,
      dispatcherLeaderProcessFactory);
   //开启DefaultDispatcherRunner的生命周期
   return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
   return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService);
}
  • 1
  • 2
  • 3

把选举服务给了这个方法作为参数:leaderElectionService

private DispatcherRunnerLeaderElectionLifecycleManager(T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
   this.dispatcherRunner = dispatcherRunner;
   this.leaderElectionService = leaderElectionService;

   leaderElectionService.start(dispatcherRunner);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

里面执行了选举服务:leaderElectionService.start(dispatcherRunner);

public final void start(LeaderContender contender) throws Exception {
   checkNotNull(contender, "Contender must not be null.");
   Preconditions.checkState(leaderContender == null, "Contender was already set.");

   synchronized (lock) {
      leaderContender = contender;
      leaderElectionDriver = leaderElectionDriverFactory.createLeaderElectionDriver(
         this, new LeaderElectionFatalErrorHandler(), leaderContender.getDescription());
      LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);

      running = true;
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

这里的leaderContender 是dispatcherRunner。再次来到ZooKeeperLeaderElectionDriver

public ZooKeeperLeaderElectionDriver(
      CuratorFramework client,
      String latchPath,
      String leaderPath,
      LeaderElectionEventHandler leaderElectionEventHandler,
      FatalErrorHandler fatalErrorHandler,
      String leaderContenderDescription) throws Exception {
   this.client = checkNotNull(client);
   this.leaderPath = checkNotNull(leaderPath);
   this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
   this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
   this.leaderContenderDescription = checkNotNull(leaderContenderDescription);

   leaderLatch = new LeaderLatch(client, checkNotNull(latchPath));
   cache = new NodeCache(client, leaderPath);

   client.getUnhandledErrorListenable().addListener(this);

   running = true;

   leaderLatch.addListener(this);
   leaderLatch.start();

   cache.getListenable().addListener(this);
   cache.start();

   client.getConnectionStateListenable().addListener(listener);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

选举成功回调ZooKeeperLeaderElectionDriver的isLeader方法:

@Override
public void isLeader() {
   leaderElectionEventHandler.onGrantLeadership();
}
  • 1
  • 2
  • 3
  • 4

然后到DefaultDispatcherRunner的grantLeadership方法

@Override
public void grantLeadership(UUID leaderSessionID) {
   runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}
  • 1
  • 2
  • 3
  • 4

调startNewDispatcherLeaderProcess

private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
   //停掉旧的实例
   stopDispatcherLeaderProcess();
    //创建新的实例
   dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
    //启动新的实例
   final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
   FutureUtils.assertNoException(
      previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

启动方法:newDispatcherLeaderProcess::start,调用的是AbstractDispatcherLeaderProcess类的start方法

@Override
public final void start() {
   runIfStateIs(
      State.CREATED,
      this::startInternal);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
private void startInternal() {
   log.info("Start {}.", getClass().getSimpleName());
   state = State.RUNNING;
   onStart();
}
  • 1
  • 2
  • 3
  • 4
  • 5

调用的是SessionDispatcherLeaderProcess的onStart方法:

@Override
protected void onStart() {
   startServices();

   onGoingRecoveryOperation = recoverJobsAsync()//拿到所有的jobGraph对象
      .thenAccept(this::createDispatcherIfRunning)//将每个job启动一个dispatcher跑起来
      .handle(this::onErrorIfRunning);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

1.startService:就是启动jobGraphStore,用来存储jobGraph

private void startServices() {
   try {
      jobGraphStore.start(this);
   } catch (Exception e) {
      throw new FlinkRuntimeException(
         String.format(
            "Could not start %s when trying to start the %s.",
            jobGraphStore.getClass().getSimpleName(),
            getClass().getSimpleName()),
         e);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.recoverJobsAsync:恢复待执行的任务,异步的方式,拿到所有需要恢复的jobGraph,真正恢复需要调用后面的.thenAccept(this::createDispatcherIfRunning)

private CompletableFuture<Collection<JobGraph>> recoverJobsAsync() {
   return CompletableFuture.supplyAsync(
      this::recoverJobsIfRunning,
      ioExecutor);
}
  • 1
  • 2
  • 3
  • 4
  • 5

调用的recoverJobsIfRunning:

private Collection<JobGraph> recoverJobsIfRunning() {
   return supplyUnsynchronizedIfRunning(this::recoverJobs).orElse(Collections.emptyList());

}
  • 1
  • 2
  • 3
  • 4

恢复job方法:recoverJobs

private Collection<JobGraph> recoverJobs() {
   log.info("Recover all persisted job graphs.");
   final Collection<JobID> jobIds = getJobIds();
   final Collection<JobGraph> recoveredJobGraphs = new ArrayList<>();

   for (JobID jobId : jobIds) {
      recoveredJobGraphs.add(recoverJob(jobId));
   }

   log.info("Successfully recovered {} persisted job graphs.", recoveredJobGraphs.size());

   return recoveredJobGraphs;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

拿到所有的job,然后根据jobid去恢复。

private Collection<JobID> getJobIds() {
   try {
      return jobGraphStore.getJobIds();
   } catch (Exception e) {
      throw new FlinkRuntimeException(
         "Could not retrieve job ids of persisted jobs.",
         e);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

recoverJob是真正恢复job的方法:

private JobGraph recoverJob(JobID jobId) {
   log.info("Trying to recover job with job id {}.", jobId);
   try {
      return jobGraphStore.recoverJobGraph(jobId);
   } catch (Exception e) {
      throw new FlinkRuntimeException(
         String.format("Could not recover job with job id %s.", jobId),
         e);
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

实现类是DefaultJobGraphStore的recoverJobGraph,拿到所有的jobGraph

public JobGraph recoverJobGraph(JobID jobId) throws Exception {
   checkNotNull(jobId, "Job ID");

   LOG.debug("Recovering job graph {} from {}.", jobId, jobGraphStateHandleStore);

   final String name = jobGraphStoreUtil.jobIDToName(jobId);

   synchronized (lock) {
      verifyIsRunning();

      boolean success = false;

      RetrievableStateHandle<JobGraph> jobGraphRetrievableStateHandle;

      try {
         try {
            jobGraphRetrievableStateHandle = jobGraphStateHandleStore.getAndLock(name);
         } catch (StateHandleStore.NotExistException ignored) {
            success = true;
            return null;
         } catch (Exception e) {
            throw new FlinkException("Could not retrieve the submitted job graph state handle " +
               "for " + name + " from the submitted job graph store.", e);
         }

         JobGraph jobGraph;
         try {
            jobGraph = jobGraphRetrievableStateHandle.retrieveState();
         } catch (ClassNotFoundException cnfe) {
            throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + name +
               ". This indicates that you are trying to recover from state written by an " +
               "older Flink version which is not compatible. Try cleaning the state handle store.", cnfe);
         } catch (IOException ioe) {
            throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + name +
               ". This indicates that the retrieved state handle is broken. Try cleaning the state handle " +
               "store.", ioe);
         }

         addedJobGraphs.add(jobGraph.getJobID());

         LOG.info("Recovered {}.", jobGraph);

         success = true;
         return jobGraph;
      } finally {
         if (!success) {
            jobGraphStateHandleStore.release(name);
         }
      }
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

上面拿到了需要恢复的jobGraph,下面执行.thenAccept(this::createDispatcherIfRunning)

private void createDispatcherIfRunning(Collection<JobGraph> jobGraphs) {
   runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
}
  • 1
  • 2
  • 3

每一个jobGraph都需要一个dispatcher去调度:

private void createDispatcher(Collection<JobGraph> jobGraphs) {

   final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
      DispatcherId.fromUuid(getLeaderSessionId()),
      jobGraphs,
      jobGraphStore);

   completeDispatcherSetup(dispatcherService);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

而启动jobGraph在dispatcherGatewayServiceFactory.create方法里面

private void createDispatcher(Collection<JobGraph> jobGraphs) {

   final DispatcherGatewayService dispatcherService = dispatcherGatewayServiceFactory.create(
      DispatcherId.fromUuid(getLeaderSessionId()),
      jobGraphs,
      jobGraphStore);

   completeDispatcherSetup(dispatcherService);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
      DispatcherId fencingToken,
      Collection<JobGraph> recoveredJobs,
      JobGraphWriter jobGraphWriter) {

   final Dispatcher dispatcher;
   try {
       //返回的是一个StandaloneDispatcher 
      dispatcher = dispatcherFactory.createDispatcher(
         rpcService,
         fencingToken,
         recoveredJobs,
         (dispatcherGateway, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(),
         PartialDispatcherServicesWithJobGraphStore.from(partialDispatcherServices, jobGraphWriter));
   } catch (Exception e) {
      throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
   }

   dispatcher.start();

   return DefaultDispatcherGatewayService.from(dispatcher);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

整个流程总结一下:
onStart()
->recoverJobsAsync()(恢复所有的jobGraph)
->createDispatcherIfRunning()
-> runIfStateIs(State.RUNNING, () -> createDispatcher(jobGraphs));
->createDispatcher(jobGraphs)(恢复jobGraph)

也就是对于每个jobGraph,都需要一个dispatcher去调度运行。
其中createDispatcher返回的是一个StandaloneDispatcher

@Override
public StandaloneDispatcher createDispatcher(
      RpcService rpcService,
      DispatcherId fencingToken,
      Collection<JobGraph> recoveredJobs,
      DispatcherBootstrapFactory dispatcherBootstrapFactory,
      PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception {
   // create the default dispatcher
   return new StandaloneDispatcher(
      rpcService,
      fencingToken,
      recoveredJobs,
      dispatcherBootstrapFactory,
      DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, DefaultJobManagerRunnerFactory.INSTANCE));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

dispatcher的start又进入了rpcServer的start从而转到onStart方法。也就是StandaloneDispatcher 的onstart方法

public final void start() {
   rpcServer.start();
}
  • 1
  • 2
  • 3

调用的是父类Dispatcher 的onStart方法

@Override
public void onStart() throws Exception {
   try {
      startDispatcherServices();
   } catch (Throwable t) {
      final DispatcherException exception = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), t);
      onFatalError(exception);
      throw exception;
   }

   startRecoveredJobs();
   this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create(
         getSelfGateway(DispatcherGateway.class),
         this.getRpcService().getScheduledExecutor() ,
         this::onFatalError);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

startRecoveredJobs();将任务调度执行起来。

private void startRecoveredJobs() {
   for (JobGraph recoveredJob : recoveredJobs) {
      runRecoveredJob(recoveredJob);
   }
   recoveredJobs.clear();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
private void runRecoveredJob(final JobGraph recoveredJob) {
   checkNotNull(recoveredJob);
   try {
      runJob(recoveredJob, ExecutionType.RECOVERY);
   } catch (Throwable throwable) {
      onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", recoveredJob.getJobID()), throwable));
   }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

到这里job就运行起来了。运行的模式是ExecutionType.RECOVERY。
job恢复后调用completeDispatcherSetup方法:添加一些回调。

final void completeDispatcherSetup(DispatcherGatewayService dispatcherService) {
   runIfStateIs(
      State.RUNNING,
      () -> completeDispatcherSetupInternal(dispatcherService));
}
  • 1
  • 2
  • 3
  • 4
  • 5
private void completeDispatcherSetupInternal(DispatcherGatewayService createdDispatcherService) {
   Preconditions.checkState(dispatcherService == null, "The DispatcherGatewayService can only be set once.");
   dispatcherService = createdDispatcherService;
   dispatcherGatewayFuture.complete(createdDispatcherService.getGateway());
   FutureUtils.forward(createdDispatcherService.getShutDownFuture(), shutDownFuture);
   handleUnexpectedDispatcherServiceTermination(createdDispatcherService);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

总结

到此,jobmanager的三个最重要的组件就启动完成了,三个组件分别是:

WebMonitorEndpoint

  1. 初始化一大堆 Handler 和 一个 Router,并且进行排序去重,之后,再把每个 Handler 注册到Router当中
  2. 启动一个 Netty 的服务端
  3. 启动内部服务:执行竞选!WebMonitorEndpoint 本身就是一个 LeaderContender 角色。如果竞选成功,则回调 isLeader() 方法
  4. 竞选成功,其实就只是把 WebMontiroEndpoint 的 address 以及跟 zookeeper的sessionID 写入到 znode 中
  5. 启动一个关于 ExecutionGraph 的 Cache 的定时清理任务

ResourceManager

1、ResourceManager 是 RpcEndpoint 的子类,所以在构建 ResourceManager 对象完成之后,会调用 start() 方法来启动这个 RpcEndpoint,然后就调准到它的 onStart() 方法执行。
2、ResourceManager 是 LeaderContender 的子类,会通过 LeaderElectionService 参加竞选,如果竞选成功,则会回调 isLeader() 方法。
3、启动 ResourceManager 需要的一些服务:
两个心跳服务:
(1)ResourceManager 和 TaskExecutor 之间的心跳
(2)ResourceManager 和 JobMaster 之间的心跳
两个定时服务:
(1)checkTaskManagerTimeoutsAndRedundancy() 检查 TaskExecutor的超时
(2)checkSlotRequestTimeouts() 检查 SlotRequest 超时

Dispatcher 启动和初始化

1、启动 JobGraphStore 服务
2、从 JobGraphStrore 恢复执行 Job, 要启动 Dispatcher

到此为止,job manager启动完成!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/395790
推荐阅读
相关标签
  

闽ICP备14008679号