Flink chcekpoint作为flink中最重要的部分,是flink精准一次性的重要保证,可以这么说flink之所以这么成功和她的checkpoint机制是离不开的。
这次我详细屡了一下,方便我们更好理解checkpoint,下面我们先看代码中我们一般是如何配置flink checkpoint的:
- // TODO 1. 环境准备
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
- // TODO 2. 状态后端设置
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
- );
- env.setRestartStrategy(RestartStrategies.failureRateRestart(
- 3, Time.days(1), Time.minutes(1)
- ));
- env.setStateBackend(new HashMapStateBackend());
- env.getCheckpointConfig().setCheckpointStorage(
- "hdfs://hadoop102:8020/ck"
- );
- 中间的业务代码省略
- // TODO 3. 启动任务
- env.execute();

- private StreamGraphGenerator getStreamGraphGenerator() {
- if (transformations.size() <= 0) {
- throw new IllegalStateException(
- "No operators defined in streaming topology. Cannot execute.");
- }
- final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);
- return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
- .setRuntimeExecutionMode(executionMode) //设置执行模式,流式或者批处理
- .setStateBackend(defaultStateBackend) //设置状态后端
- .setSavepointDir(defaultSavepointDirectory) //设置checkpoint文件夹
- .setChaining(isChainingEnabled) //是否开启chaining
- .setUserArtifacts(cacheFile) //设置用户jar包
- .setTimeCharacteristic(timeCharacteristic) //设置时间语义
- .setDefaultBufferTimeout(bufferTimeout); //设置buffer超时时间
- }

- public StreamGraph generate() {
- //checkpointCfg和savepoint的配置被当做参数传给streamGraph
- streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
- shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
- configureStreamGraph(streamGraph);
- alreadyTransformed = new HashMap<>();
- //转换算子,将env的成员变量transformations中的算子遍历出来,塞到streamGraph中
- for (Transformation<?> transformation : transformations) {
- transform(transformation);
- }
- for (StreamNode node : streamGraph.getStreamNodes()) {
- if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
- for (StreamEdge edge : node.getInEdges()) {
- edge.setSupportsUnalignedCheckpoints(false);
- }
- }
- }
- final StreamGraph builtStreamGraph = streamGraph;
- alreadyTransformed.clear();
- alreadyTransformed = null;
- streamGraph = null;
- return builtStreamGraph;
- }

这个方法中调用 StreamingJobGraphGenerator.createJobGraph方法
- private JobGraph createJobGraph() {
- preValidate(); //预检查checkpoint是否开始等属性
- jobGraph.setJobType(streamGraph.getJobType());
- jobGraph.enableApproximateLocalRecovery(
- streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
- // Generate deterministic hashes for the nodes in order to identify them across
- // submission iff they didn't change.
- Map<Integer, byte[]> hashes =
- defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
- // Generate legacy version hashes for backwards compatibility
- List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
- for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
- legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
- }
- setChaining(hashes, legacyHashes);
- setPhysicalEdges();
- setSlotSharingAndCoLocation();
- setManagedMemoryFraction(
- Collections.unmodifiableMap(jobVertices),
- Collections.unmodifiableMap(vertexConfigs),
- Collections.unmodifiableMap(chainedConfigs),
- id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
- id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
- //配置checkpoint
- configureCheckpointing();
- //设置savepoint
- jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
- final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
- JobGraphUtils.prepareUserArtifactEntries(
- streamGraph.getUserArtifacts().stream()
- .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
- jobGraph.getJobID());
- for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
- distributedCacheEntries.entrySet()) {
- jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
- }
- // set the ExecutionConfig last when it has been finalized
- try {
- jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
- } catch (IOException e) {
- throw new IllegalConfigurationException(
- "Could not serialize the ExecutionConfig."
- + "This indicates that non-serializable types (like custom serializers) were registered");
- }
- return jobGraph;
- }

在preValidate方法中会检验是否开始了checkpoint,是否支持unalign checkpoint等,在configureCheckpointing()方法中才真正将streamGraph的checkpoint相关配置传给jobGraph
- private void configureCheckpointing() {
- CheckpointConfig cfg = streamGraph.getCheckpointConfig();
- long interval = cfg.getCheckpointInterval();
- if (interval < MINIMAL_CHECKPOINT_TIME) {
- // interval of max value means disable periodic checkpoint
- interval = Long.MAX_VALUE;
- }
- // --- configure options ---
- CheckpointRetentionPolicy retentionAfterTermination;
- if (cfg.isExternalizedCheckpointsEnabled()) {
- CheckpointConfig.ExternalizedCheckpointCleanup cleanup =
- cfg.getExternalizedCheckpointCleanup();
- // Sanity check
- if (cleanup == null) {
- throw new IllegalStateException(
- "Externalized checkpoints enabled, but no cleanup mode configured.");
- }
- retentionAfterTermination =
- cleanup.deleteOnCancellation()
- ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
- : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
- } else {
- retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
- }
- // --- configure the master-side checkpoint hooks ---
- final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();
- for (StreamNode node : streamGraph.getStreamNodes()) {
- if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory) {
- Function f =
- ((UdfStreamOperatorFactory) node.getOperatorFactory()).getUserFunction();
- if (f instanceof WithMasterCheckpointHook) {
- hooks.add(
- new FunctionMasterCheckpointHookFactory(
- (WithMasterCheckpointHook<?>) f));
- }
- }
- }
- // because the hooks can have user-defined code, they need to be stored as
- // eagerly serialized values
- final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
- if (hooks.isEmpty()) {
- serializedHooks = null;
- } else {
- try {
- MasterTriggerRestoreHook.Factory[] asArray =
- hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
- serializedHooks = new SerializedValue<>(asArray);
- } catch (IOException e) {
- throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
- }
- }
- // because the state backend can have user-defined code, it needs to be stored as
- // eagerly serialized value
- final SerializedValue<StateBackend> serializedStateBackend;
- if (streamGraph.getStateBackend() == null) {
- serializedStateBackend = null;
- } else {
- try {
- serializedStateBackend =
- new SerializedValue<StateBackend>(streamGraph.getStateBackend());
- } catch (IOException e) {
- throw new FlinkRuntimeException("State backend is not serializable", e);
- }
- }
- // because the checkpoint storage can have user-defined code, it needs to be stored as
- // eagerly serialized value
- final SerializedValue<CheckpointStorage> serializedCheckpointStorage;
- if (streamGraph.getCheckpointStorage() == null) {
- serializedCheckpointStorage = null;
- } else {
- try {
- serializedCheckpointStorage =
- new SerializedValue<>(streamGraph.getCheckpointStorage());
- } catch (IOException e) {
- throw new FlinkRuntimeException("Checkpoint storage is not serializable", e);
- }
- }
- // --- done, put it all together ---
- 创建一个JobCheckpointingSettings,用来封装checkpoint的相关配置
- JobCheckpointingSettings settings =
- new JobCheckpointingSettings(
- CheckpointCoordinatorConfiguration.builder()
- .setCheckpointInterval(interval)
- .setCheckpointTimeout(cfg.getCheckpointTimeout())
- .setMinPauseBetweenCheckpoints(cfg.getMinPauseBetweenCheckpoints())
- .setMaxConcurrentCheckpoints(cfg.getMaxConcurrentCheckpoints())
- .setCheckpointRetentionPolicy(retentionAfterTermination)
- .setExactlyOnce(
- getCheckpointingMode(cfg) == CheckpointingMode.EXACTLY_ONCE)
- .setPreferCheckpointForRecovery(cfg.isPreferCheckpointForRecovery())
- .setTolerableCheckpointFailureNumber(
- cfg.getTolerableCheckpointFailureNumber())
- .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
- .setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis())
- .build(),
- serializedStateBackend,
- serializedCheckpointStorage,
- serializedHooks);
- //将JobCheckpointingSettings传给JobGraph
- jobGraph.setSnapshotSettings(settings);
- }

- @Override
- protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- final Collection<File> uploadedFiles = request.getUploadedFiles();
- final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
- File::getName,
- Path::fromLocalFile
- ));
- if (uploadedFiles.size() != nameToFile.size()) {
- throw new RestHandlerException(
- String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
- uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
- nameToFile.size(),
- uploadedFiles.size()),
- HttpResponseStatus.BAD_REQUEST
- );
- }
- final JobSubmitRequestBody requestBody = request.getRequestBody();
- if (requestBody.jobGraphFileName == null) {
- throw new RestHandlerException(
- String.format("The %s field must not be omitted or be null.",
- JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
- HttpResponseStatus.BAD_REQUEST);
- }
- //加载Jobgraph
- CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
- //获取jar包
- Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
- //获取jar包相关的依赖
- Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
- CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
- //调用dispatcherGateway的submitJob方法
- CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
- return jobSubmissionFuture.thenCombine(jobGraphFuture,
- (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
- }

- @Override
- public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
- log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
- try {
- if (isDuplicateJob(jobGraph.getJobID())) {
- return FutureUtils.completedExceptionally(
- new DuplicateJobSubmissionException(jobGraph.getJobID()));
- } else if (isPartialResourceConfigured(jobGraph)) {
- return FutureUtils.completedExceptionally(
- new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
- "resources configured. The limitation will be removed in future versions."));
- } else {
- return internalSubmitJob(jobGraph);
- }
- } catch (FlinkException e) {
- return FutureUtils.completedExceptionally(e);
- }
- }

- private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
- log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
- final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
- this::persistAndRunJob)
- .thenApply(ignored -> Acknowledge.get());
- return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
- if (throwable != null) {
- cleanUpJobData(jobGraph.getJobID(), true);
- ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
- final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
- log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
- throw new CompletionException(
- new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
- } else {
- return acknowledge;
- }
- }, ioExecutor);
- }

- private void persistAndRunJob(JobGraph jobGraph) throws Exception {
- jobGraphWriter.putJobGraph(jobGraph);
- runJob(jobGraph, ExecutionType.SUBMISSION);
- }
- private void runJob(JobGraph jobGraph, ExecutionType executionType) {
- Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
- long initializationTimestamp = System.currentTimeMillis();
- /*TODO 创建并启动JobMaster*/
- CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
- DispatcherJob dispatcherJob = DispatcherJob.createFor(
- jobManagerRunnerFuture,
- jobGraph.getJobID(),
- jobGraph.getName(),
- initializationTimestamp);
- runningJobs.put(jobGraph.getJobID(), dispatcherJob);
- final JobID jobId = jobGraph.getJobID();
- //处理Job的运行结果
- final CompletableFuture<CleanupJobState> cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync(
- (dispatcherJobResult, throwable) -> {
- Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
- if (dispatcherJobResult != null) {
- return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);
- } else {
- return dispatcherJobFailed(jobId, throwable);
- }
- }, getMainThreadExecutor());
- //Job执行失败后,清理Job的执行状态文件,然后移除Job
- final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture
- .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
- .thenCompose(Function.identity());
- FutureUtils.assertNoException(jobTerminationFuture);
- registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
- }

- CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
- final RpcService rpcService = getRpcService();
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- /*TODO 创建JobMaster */
- JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
- jobGraph,
- configuration,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- jobManagerSharedServices,
- new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
- fatalErrorHandler,
- initializationTimestamp);
- /*TODO 启动JobMaster*/
- runner.start();
- return runner;
- } catch (Exception e) {
- throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
- }
- },
- ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
- }

1.JobManagerFactory # createJobManagerRunner
2.DefaultJobManagerRunnerFactory # createJobManagerRunner
3.JobManagerRuunerImpl # JobManagerRuunerImpl
4.DefaultJobMasterServiceFactory # createJobMasterService
5.JobMaster # JobMaster ==> createScheduler
6.DefaultSchedulerFactory # createInstance
7.SchedulerBase # createAndRestoreExecutionGraph
- private ExecutionGraph createAndRestoreExecutionGraph(
- JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
- ShuffleMaster<?> shuffleMaster,
- JobMasterPartitionTracker partitionTracker,
- ExecutionDeploymentTracker executionDeploymentTracker,
- long initializationTimestamp) throws Exception {
- ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);
- final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
- if (checkpointCoordinator != null) {
- // check whether we find a valid checkpoint
- if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
- new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
- // check whether we can restore from a savepoint
- tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
- }
- }
- return newExecutionGraph;
- }

schedulerBase#createExecutionGraph: 内部调用了ExecutionGraphBuilder#buildGraph方法,
- public static ExecutionGraph buildGraph(
- @Nullable ExecutionGraph prior,
- JobGraph jobGraph,
- Configuration jobManagerConfig,
- ScheduledExecutorService futureExecutor,
- Executor ioExecutor,
- SlotProvider slotProvider,
- ClassLoader classLoader,
- CheckpointRecoveryFactory recoveryFactory,
- Time rpcTimeout,
- RestartStrategy restartStrategy,
- MetricGroup metrics,
- BlobWriter blobWriter,
- Time allocationTimeout,
- Logger log,
- ShuffleMaster<?> shuffleMaster,
- JobMasterPartitionTracker partitionTracker,
- FailoverStrategy.Factory failoverStrategyFactory,
- ExecutionDeploymentListener executionDeploymentListener,
- ExecutionStateUpdateListener executionStateUpdateListener,
- long initializationTimestamp) throws JobExecutionException, JobException {
- 。。。。。。这部分省略不看。。。。。
- // configure the state checkpointing
- //获取jobGraph的checkpoint设置
- JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
- if (snapshotSettings != null) {
- List<ExecutionJobVertex> triggerVertices =
- idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
- List<ExecutionJobVertex> ackVertices =
- idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
- List<ExecutionJobVertex> confirmVertices =
- idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
- CompletedCheckpointStore completedCheckpoints;
- CheckpointIDCounter checkpointIdCounter;
- try {
- int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
- CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
- if (maxNumberOfCheckpointsToRetain <= 0) {
- // warning and use 1 as the default value if the setting in
- // state.checkpoints.max-retained-checkpoints is not greater than 0.
- log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
- CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
- maxNumberOfCheckpointsToRetain,
- CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
- maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
- }
- completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
- checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
- }
- catch (Exception e) {
- throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
- }
- // Maximum number of remembered checkpoints
- int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
- CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
- historySize,
- ackVertices,
- snapshotSettings.getCheckpointCoordinatorConfiguration(),
- metrics);
- // load the state backend from the application settings
- final StateBackend applicationConfiguredBackend;
- final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
- if (serializedAppConfigured == null) {
- applicationConfiguredBackend = null;
- }
- else {
- try {
- applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
- } catch (IOException | ClassNotFoundException e) {
- throw new JobExecutionException(jobId,
- "Could not deserialize application-defined state backend.", e);
- }
- }
- //获取状态后端
- final StateBackend rootBackend;
- try {
- rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
- applicationConfiguredBackend, jobManagerConfig, classLoader, log);
- }
- catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
- throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
- }
- // instantiate the user-defined checkpoint hooks
- //初始化用户自定义的checkpoint 钩子
- final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
- final List<MasterTriggerRestoreHook<?>> hooks;
- if (serializedHooks == null) {
- hooks = Collections.emptyList();
- }
- else {
- final MasterTriggerRestoreHook.Factory[] hookFactories;
- try {
- hookFactories = serializedHooks.deserializeValue(classLoader);
- }
- catch (IOException | ClassNotFoundException e) {
- throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
- }
- final Thread thread = Thread.currentThread();
- final ClassLoader originalClassLoader = thread.getContextClassLoader();
- thread.setContextClassLoader(classLoader);
- try {
- hooks = new ArrayList<>(hookFactories.length);
- for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
- hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
- }
- }
- finally {
- thread.setContextClassLoader(originalClassLoader);
- }
- }
- final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
- //将checkpoint的配置传入executionGraph
- executionGraph.enableCheckpointing(
- chkConfig,
- triggerVertices,
- ackVertices,
- confirmVertices,
- hooks,
- checkpointIdCounter,
- completedCheckpoints,
- rootBackend,
- checkpointStatsTracker);
- }
- // create all the metrics for the Execution Graph
- metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
- metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
- metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
- executionGraph.getFailoverStrategy().registerMetrics(metrics);
- return executionGraph;
- }

