当前位置:   article > 正文

Flink-Checkpoint源码详解_checkpoint源码解析

checkpoint源码解析

Flink chcekpoint作为flink中最重要的部分,是flink精准一次性的重要保证,可以这么说flink之所以这么成功和她的checkpoint机制是离不开的。

之前大概学习了一下flink的checkpoint源码,但是还是有点晕乎乎的,甚至有点不理解我们作业中设置的checkpoint配置flink是如何读取到的,并且他是如何往下传播的。

1.代码中的checkpoint配置

这次我详细屡了一下,方便我们更好理解checkpoint,下面我们先看代码中我们一般是如何配置flink checkpoint的:

  1. // TODO 1. 环境准备
  2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  3. env.setParallelism(4);
  4. // TODO 2. 状态后端设置
  5. env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
  6. env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
  7. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
  8. env.getCheckpointConfig().enableExternalizedCheckpoints(
  9. CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
  10. );
  11. env.setRestartStrategy(RestartStrategies.failureRateRestart(
  12. 3, Time.days(1), Time.minutes(1)
  13. ));
  14. env.setStateBackend(new HashMapStateBackend());
  15. env.getCheckpointConfig().setCheckpointStorage(
  16. "hdfs://hadoop102:8020/ck"
  17. );
  18. 中间的业务代码省略
  19. // TODO 3. 启动任务
  20. env.execute();

在这里我们可以看到checkpoint的相关配置是调用streamExecutionEnvironment的各种方法配置,那我们代码中的checkpoint配置env肯定是拿到了

点进enableCheckpointing()方法中可以看到我们配置的checkpint相关参数被streamExecutionEnvironment的一个成员变量checkpointCfg获取到了

 所以下面我们只需要盯着这个checkpointCfg是怎么传到下面的就可以了,下面我们从作业的起点env.execute()方法往下走,下一步flink就要把我们的作业转换成streamGraph了

 这个方法中调用了getStreamGraph方法,生成作业对应的streamGraph

2.StreamGraph中的checkpoint配置 

上一步我们看到了env.execute(jobName)方法,我们看到了他的方法中调用了getStreamGraph方法,这个方法就是生成StreamGraph的,从下面这个图中我们可以看到首先它利用了getStreamGraphGenerator生成StreamGraphGenerator,然后根据generator生成streamGraph

  1. private StreamGraphGenerator getStreamGraphGenerator() {
  2. if (transformations.size() <= 0) {
  3. throw new IllegalStateException(
  4. "No operators defined in streaming topology. Cannot execute.");
  5. }
  6. final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);
  7. return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
  8. .setRuntimeExecutionMode(executionMode) //设置执行模式,流式或者批处理
  9. .setStateBackend(defaultStateBackend) //设置状态后端
  10. .setSavepointDir(defaultSavepointDirectory) //设置checkpoint文件夹
  11. .setChaining(isChainingEnabled) //是否开启chaining
  12. .setUserArtifacts(cacheFile) //设置用户jar包
  13. .setTimeCharacteristic(timeCharacteristic) //设置时间语义
  14. .setDefaultBufferTimeout(bufferTimeout); //设置buffer超时时间
  15. }

从这个方法体中我们可以看到checkpointCfg被当做参数传给了streamGraphGenerator,并且他还配置了checkpoint和状态后端的相关参数

  1. public StreamGraph generate() {
  2. //checkpointCfg和savepoint的配置被当做参数传给streamGraph
  3. streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
  4. shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
  5. configureStreamGraph(streamGraph);
  6. alreadyTransformed = new HashMap<>();
  7. //转换算子,将env的成员变量transformations中的算子遍历出来,塞到streamGraph中
  8. for (Transformation<?> transformation : transformations) {
  9. transform(transformation);
  10. }
  11. for (StreamNode node : streamGraph.getStreamNodes()) {
  12. if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
  13. for (StreamEdge edge : node.getInEdges()) {
  14. edge.setSupportsUnalignedCheckpoints(false);
  15. }
  16. }
  17. }
  18. final StreamGraph builtStreamGraph = streamGraph;
  19. alreadyTransformed.clear();
  20. alreadyTransformed = null;
  21. streamGraph = null;
  22. return builtStreamGraph;
  23. }

在streamGraphGenerator的generate()方法中我们可以看到checkpointCfg作为参数传给了streamGraph,这样streamGraph就拿到了我们的checkpoint配置了

3.JobGraph的checkpoint配置

下面我们先返回streamExecutionEnvironment的execute方法,下面我们要看的是在Jobgraph生产过程中,checkpoint配置是如何传递的,从env.execute()方法经过一系列的方法跳转,最终我们可以看到生成Jobgraph的方法是streamGraph.getJobGraph()

这个方法中调用 StreamingJobGraphGenerator.createJobGraph方法 

这个方法中首先创建了一个StreamingJobGraphGenerator.,然后调用其createJobGraph方法 

 然后我们详细看一下createJobGraph方法

  1. private JobGraph createJobGraph() {
  2. preValidate(); //预检查checkpoint是否开始等属性
  3. jobGraph.setJobType(streamGraph.getJobType());
  4. jobGraph.enableApproximateLocalRecovery(
  5. streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
  6. // Generate deterministic hashes for the nodes in order to identify them across
  7. // submission iff they didn't change.
  8. Map<Integer, byte[]> hashes =
  9. defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
  10. // Generate legacy version hashes for backwards compatibility
  11. List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
  12. for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
  13. legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
  14. }
  15. setChaining(hashes, legacyHashes);
  16. setPhysicalEdges();
  17. setSlotSharingAndCoLocation();
  18. setManagedMemoryFraction(
  19. Collections.unmodifiableMap(jobVertices),
  20. Collections.unmodifiableMap(vertexConfigs),
  21. Collections.unmodifiableMap(chainedConfigs),
  22. id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
  23. id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
  24. //配置checkpoint
  25. configureCheckpointing();
  26. //设置savepoint
  27. jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
  28. final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
  29. JobGraphUtils.prepareUserArtifactEntries(
  30. streamGraph.getUserArtifacts().stream()
  31. .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
  32. jobGraph.getJobID());
  33. for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
  34. distributedCacheEntries.entrySet()) {
  35. jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
  36. }
  37. // set the ExecutionConfig last when it has been finalized
  38. try {
  39. jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
  40. } catch (IOException e) {
  41. throw new IllegalConfigurationException(
  42. "Could not serialize the ExecutionConfig."
  43. + "This indicates that non-serializable types (like custom serializers) were registered");
  44. }
  45. return jobGraph;
  46. }

在preValidate方法中会检验是否开始了checkpoint,是否支持unalign checkpoint等,在configureCheckpointing()方法中才真正将streamGraph的checkpoint相关配置传给jobGraph

  1. private void configureCheckpointing() {
  2. CheckpointConfig cfg = streamGraph.getCheckpointConfig();
  3. long interval = cfg.getCheckpointInterval();
  4. if (interval < MINIMAL_CHECKPOINT_TIME) {
  5. // interval of max value means disable periodic checkpoint
  6. interval = Long.MAX_VALUE;
  7. }
  8. // --- configure options ---
  9. CheckpointRetentionPolicy retentionAfterTermination;
  10. if (cfg.isExternalizedCheckpointsEnabled()) {
  11. CheckpointConfig.ExternalizedCheckpointCleanup cleanup =
  12. cfg.getExternalizedCheckpointCleanup();
  13. // Sanity check
  14. if (cleanup == null) {
  15. throw new IllegalStateException(
  16. "Externalized checkpoints enabled, but no cleanup mode configured.");
  17. }
  18. retentionAfterTermination =
  19. cleanup.deleteOnCancellation()
  20. ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
  21. : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
  22. } else {
  23. retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
  24. }
  25. // --- configure the master-side checkpoint hooks ---
  26. final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();
  27. for (StreamNode node : streamGraph.getStreamNodes()) {
  28. if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory) {
  29. Function f =
  30. ((UdfStreamOperatorFactory) node.getOperatorFactory()).getUserFunction();
  31. if (f instanceof WithMasterCheckpointHook) {
  32. hooks.add(
  33. new FunctionMasterCheckpointHookFactory(
  34. (WithMasterCheckpointHook<?>) f));
  35. }
  36. }
  37. }
  38. // because the hooks can have user-defined code, they need to be stored as
  39. // eagerly serialized values
  40. final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
  41. if (hooks.isEmpty()) {
  42. serializedHooks = null;
  43. } else {
  44. try {
  45. MasterTriggerRestoreHook.Factory[] asArray =
  46. hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
  47. serializedHooks = new SerializedValue<>(asArray);
  48. } catch (IOException e) {
  49. throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
  50. }
  51. }
  52. // because the state backend can have user-defined code, it needs to be stored as
  53. // eagerly serialized value
  54. final SerializedValue<StateBackend> serializedStateBackend;
  55. if (streamGraph.getStateBackend() == null) {
  56. serializedStateBackend = null;
  57. } else {
  58. try {
  59. serializedStateBackend =
  60. new SerializedValue<StateBackend>(streamGraph.getStateBackend());
  61. } catch (IOException e) {
  62. throw new FlinkRuntimeException("State backend is not serializable", e);
  63. }
  64. }
  65. // because the checkpoint storage can have user-defined code, it needs to be stored as
  66. // eagerly serialized value
  67. final SerializedValue<CheckpointStorage> serializedCheckpointStorage;
  68. if (streamGraph.getCheckpointStorage() == null) {
  69. serializedCheckpointStorage = null;
  70. } else {
  71. try {
  72. serializedCheckpointStorage =
  73. new SerializedValue<>(streamGraph.getCheckpointStorage());
  74. } catch (IOException e) {
  75. throw new FlinkRuntimeException("Checkpoint storage is not serializable", e);
  76. }
  77. }
  78. // --- done, put it all together ---
  79. 创建一个JobCheckpointingSettings,用来封装checkpoint的相关配置
  80. JobCheckpointingSettings settings =
  81. new JobCheckpointingSettings(
  82. CheckpointCoordinatorConfiguration.builder()
  83. .setCheckpointInterval(interval)
  84. .setCheckpointTimeout(cfg.getCheckpointTimeout())
  85. .setMinPauseBetweenCheckpoints(cfg.getMinPauseBetweenCheckpoints())
  86. .setMaxConcurrentCheckpoints(cfg.getMaxConcurrentCheckpoints())
  87. .setCheckpointRetentionPolicy(retentionAfterTermination)
  88. .setExactlyOnce(
  89. getCheckpointingMode(cfg) == CheckpointingMode.EXACTLY_ONCE)
  90. .setPreferCheckpointForRecovery(cfg.isPreferCheckpointForRecovery())
  91. .setTolerableCheckpointFailureNumber(
  92. cfg.getTolerableCheckpointFailureNumber())
  93. .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
  94. .setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis())
  95. .build(),
  96. serializedStateBackend,
  97. serializedCheckpointStorage,
  98. serializedHooks);
  99. //将JobCheckpointingSettings传给JobGraph
  100. jobGraph.setSnapshotSettings(settings);
  101. }

在这个方法中会获取streamGraph的checkpointCfg,创建一个JobcheckpointSettings,并将streamGraph相关的checkpoint配置取出来放进JobcheckpointSettings,JobGraph中负责保存checkpoint相关配置的成员变量是snapshotSettings。

上述两个过程均是在客户端完成的,如果是yarn集群环境,JobGraph生成后,client会创建一个yarn-client用于提交作业,作业提交后,WebmonitorEndpoint的JobsubmitHandler会调用自己的handleRequest方法用于处理提交的作业。

4.ExecutionGraph的checkpoint配置

ExecutionGraph是在jobmaster启动的时候,将jobGraph转换成ExecutionGraph,由于jobmaster启动的过程比较繁琐,所以这边我们分析详细一点,从集群接收到客户端提交的jobGraph开始分析

1.handleRequest

这个方法中主要是获取client那边传过来的JobGraph,jar包和相关的依赖,然后调用dispatcherGateway.submitJob方法提交JobGraph

  1. @Override
  2. protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
  3. final Collection<File> uploadedFiles = request.getUploadedFiles();
  4. final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
  5. File::getName,
  6. Path::fromLocalFile
  7. ));
  8. if (uploadedFiles.size() != nameToFile.size()) {
  9. throw new RestHandlerException(
  10. String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
  11. uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
  12. nameToFile.size(),
  13. uploadedFiles.size()),
  14. HttpResponseStatus.BAD_REQUEST
  15. );
  16. }
  17. final JobSubmitRequestBody requestBody = request.getRequestBody();
  18. if (requestBody.jobGraphFileName == null) {
  19. throw new RestHandlerException(
  20. String.format("The %s field must not be omitted or be null.",
  21. JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
  22. HttpResponseStatus.BAD_REQUEST);
  23. }
  24. //加载Jobgraph
  25. CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
  26. //获取jar包
  27. Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
  28. //获取jar包相关的依赖
  29. Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
  30. CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
  31. //调用dispatcherGateway的submitJob方法
  32. CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
  33. return jobSubmissionFuture.thenCombine(jobGraphFuture,
  34. (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
  35. }

2.submitJob

dispatcherGateway.submitJob方法是一个抽象方法,他有两个实现Dispatcher和MiniDispatcher,miniDispatcher适用于本地调试调用,这里我们选择Dispatcher的submitJob方法,这个方法里对Jobgraph做了一些判断,jobID是否重复,是否是partialResource等,然后调用其internalSubmitJob方法

  1. @Override
  2. public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
  3. log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
  4. try {
  5. if (isDuplicateJob(jobGraph.getJobID())) {
  6. return FutureUtils.completedExceptionally(
  7. new DuplicateJobSubmissionException(jobGraph.getJobID()));
  8. } else if (isPartialResourceConfigured(jobGraph)) {
  9. return FutureUtils.completedExceptionally(
  10. new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
  11. "resources configured. The limitation will be removed in future versions."));
  12. } else {
  13. return internalSubmitJob(jobGraph);
  14. }
  15. } catch (FlinkException e) {
  16. return FutureUtils.completedExceptionally(e);
  17. }
  18. }

3.internalSubmitJob

这个方法中调用persistAndRunJob对Job进行持久化并运行,然后处理Job运行的结果

  1. private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
  2. log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
  3. final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
  4. this::persistAndRunJob)
  5. .thenApply(ignored -> Acknowledge.get());
  6. return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
  7. if (throwable != null) {
  8. cleanUpJobData(jobGraph.getJobID(), true);
  9. ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
  10. final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
  11. log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
  12. throw new CompletionException(
  13. new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
  14. } else {
  15. return acknowledge;
  16. }
  17. }, ioExecutor);
  18. }

4.persistAndRunJob

这里的jobGraphWriter就是用来持久化jobGraph的,然后调用其runJob方法

  1. private void persistAndRunJob(JobGraph jobGraph) throws Exception {
  2. jobGraphWriter.putJobGraph(jobGraph);
  3. runJob(jobGraph, ExecutionType.SUBMISSION);
  4. }

5.runJob

这个方法中主要做了以下几件事:

        1.调用createJobManagerRunner方法,创建并启动jobmaster

        2.处理Job的运行结果

        3.Job执行失败后,清理Job的执行状态文件,移除Job

  1. private void runJob(JobGraph jobGraph, ExecutionType executionType) {
  2. Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
  3. long initializationTimestamp = System.currentTimeMillis();
  4. /*TODO 创建并启动JobMaster*/
  5. CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
  6. DispatcherJob dispatcherJob = DispatcherJob.createFor(
  7. jobManagerRunnerFuture,
  8. jobGraph.getJobID(),
  9. jobGraph.getName(),
  10. initializationTimestamp);
  11. runningJobs.put(jobGraph.getJobID(), dispatcherJob);
  12. final JobID jobId = jobGraph.getJobID();
  13. //处理Job的运行结果
  14. final CompletableFuture<CleanupJobState> cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync(
  15. (dispatcherJobResult, throwable) -> {
  16. Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
  17. if (dispatcherJobResult != null) {
  18. return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);
  19. } else {
  20. return dispatcherJobFailed(jobId, throwable);
  21. }
  22. }, getMainThreadExecutor());
  23. //Job执行失败后,清理Job的执行状态文件,然后移除Job
  24. final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture
  25. .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
  26. .thenCompose(Function.identity());
  27. FutureUtils.assertNoException(jobTerminationFuture);
  28. registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
  29. }

6.createJobManagerRunner

这个方法中调用了JobManagerFactory.createJobManagerRunner()创建jobmaster,然后调用start方法启动Jobmaster

  1. CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
  2. final RpcService rpcService = getRpcService();
  3. return CompletableFuture.supplyAsync(
  4. () -> {
  5. try {
  6. /*TODO 创建JobMaster */
  7. JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
  8. jobGraph,
  9. configuration,
  10. rpcService,
  11. highAvailabilityServices,
  12. heartbeatServices,
  13. jobManagerSharedServices,
  14. new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
  15. fatalErrorHandler,
  16. initializationTimestamp);
  17. /*TODO 启动JobMaster*/
  18. runner.start();
  19. return runner;
  20. } catch (Exception e) {
  21. throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
  22. }
  23. },
  24. ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
  25. }

7.ExecutionGraph的生成

jobmaster的启动还是比较麻烦的,中间有比较多的过程,这里我们省略中间一些不重要的过程

1.JobManagerFactory # createJobManagerRunner 

2.DefaultJobManagerRunnerFactory # createJobManagerRunner 

3.JobManagerRuunerImpl # JobManagerRuunerImpl

4.DefaultJobMasterServiceFactory # createJobMasterService

5.JobMaster # JobMaster  ==> createScheduler

6.DefaultSchedulerFactory # createInstance

7.SchedulerBase # createAndRestoreExecutionGraph

在该方法中调用了createExecutionGraph创建executionGraph,并且还创建检查点协调器,负责该job检查点相关的工作

  1. private ExecutionGraph createAndRestoreExecutionGraph(
  2. JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
  3. ShuffleMaster<?> shuffleMaster,
  4. JobMasterPartitionTracker partitionTracker,
  5. ExecutionDeploymentTracker executionDeploymentTracker,
  6. long initializationTimestamp) throws Exception {
  7. ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);
  8. final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
  9. if (checkpointCoordinator != null) {
  10. // check whether we find a valid checkpoint
  11. if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
  12. new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
  13. // check whether we can restore from a savepoint
  14. tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
  15. }
  16. }
  17. return newExecutionGraph;
  18. }

8.ExecutionGraph中checkpoint的配置

schedulerBase#createExecutionGraph: 内部调用了ExecutionGraphBuilder#buildGraph方法,

这个方法中做了很多checkpoint相关的事,比如:

        1.获取jobgraph的checkpoint配置

        2.创建checkpointIDCounter

        3.获取状态后端

        4.获取用户自定义的checkpoint钩子

        5.将checkpoint的各种参数传入executionGraph

  1. public static ExecutionGraph buildGraph(
  2. @Nullable ExecutionGraph prior,
  3. JobGraph jobGraph,
  4. Configuration jobManagerConfig,
  5. ScheduledExecutorService futureExecutor,
  6. Executor ioExecutor,
  7. SlotProvider slotProvider,
  8. ClassLoader classLoader,
  9. CheckpointRecoveryFactory recoveryFactory,
  10. Time rpcTimeout,
  11. RestartStrategy restartStrategy,
  12. MetricGroup metrics,
  13. BlobWriter blobWriter,
  14. Time allocationTimeout,
  15. Logger log,
  16. ShuffleMaster<?> shuffleMaster,
  17. JobMasterPartitionTracker partitionTracker,
  18. FailoverStrategy.Factory failoverStrategyFactory,
  19. ExecutionDeploymentListener executionDeploymentListener,
  20. ExecutionStateUpdateListener executionStateUpdateListener,
  21. long initializationTimestamp) throws JobExecutionException, JobException {
  22. 。。。。。。这部分省略不看。。。。。
  23. // configure the state checkpointing
  24. //获取jobGraph的checkpoint设置
  25. JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
  26. if (snapshotSettings != null) {
  27. List<ExecutionJobVertex> triggerVertices =
  28. idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
  29. List<ExecutionJobVertex> ackVertices =
  30. idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
  31. List<ExecutionJobVertex> confirmVertices =
  32. idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
  33. CompletedCheckpointStore completedCheckpoints;
  34. CheckpointIDCounter checkpointIdCounter;
  35. try {
  36. int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
  37. CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
  38. if (maxNumberOfCheckpointsToRetain <= 0) {
  39. // warning and use 1 as the default value if the setting in
  40. // state.checkpoints.max-retained-checkpoints is not greater than 0.
  41. log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
  42. CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
  43. maxNumberOfCheckpointsToRetain,
  44. CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
  45. maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
  46. }
  47. completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
  48. checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
  49. }
  50. catch (Exception e) {
  51. throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
  52. }
  53. // Maximum number of remembered checkpoints
  54. int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
  55. CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
  56. historySize,
  57. ackVertices,
  58. snapshotSettings.getCheckpointCoordinatorConfiguration(),
  59. metrics);
  60. // load the state backend from the application settings
  61. final StateBackend applicationConfiguredBackend;
  62. final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
  63. if (serializedAppConfigured == null) {
  64. applicationConfiguredBackend = null;
  65. }
  66. else {
  67. try {
  68. applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
  69. } catch (IOException | ClassNotFoundException e) {
  70. throw new JobExecutionException(jobId,
  71. "Could not deserialize application-defined state backend.", e);
  72. }
  73. }
  74. //获取状态后端
  75. final StateBackend rootBackend;
  76. try {
  77. rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
  78. applicationConfiguredBackend, jobManagerConfig, classLoader, log);
  79. }
  80. catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
  81. throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
  82. }
  83. // instantiate the user-defined checkpoint hooks
  84. //初始化用户自定义的checkpoint 钩子
  85. final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
  86. final List<MasterTriggerRestoreHook<?>> hooks;
  87. if (serializedHooks == null) {
  88. hooks = Collections.emptyList();
  89. }
  90. else {
  91. final MasterTriggerRestoreHook.Factory[] hookFactories;
  92. try {
  93. hookFactories = serializedHooks.deserializeValue(classLoader);
  94. }
  95. catch (IOException | ClassNotFoundException e) {
  96. throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
  97. }
  98. final Thread thread = Thread.currentThread();
  99. final ClassLoader originalClassLoader = thread.getContextClassLoader();
  100. thread.setContextClassLoader(classLoader);
  101. try {
  102. hooks = new ArrayList<>(hookFactories.length);
  103. for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
  104. hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
  105. }
  106. }
  107. finally {
  108. thread.setContextClassLoader(originalClassLoader);
  109. }
  110. }
  111. final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
  112. //将checkpoint的配置传入executionGraph
  113. executionGraph.enableCheckpointing(
  114. chkConfig,
  115. triggerVertices,
  116. ackVertices,
  117. confirmVertices,
  118. hooks,
  119. checkpointIdCounter,
  120. completedCheckpoints,
  121. rootBackend,
  122. checkpointStatsTracker);
  123. }
  124. // create all the metrics for the Execution Graph
  125. metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
  126. metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
  127. metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
  128. executionGraph.getFailoverStrategy().registerMetrics(metrics);
  129. return executionGraph;
  130. }

 至此checkpoint的相关配置就传入了ExecutionGraph,并且还创建了checkpoint检查点来负责检查点相关的工作,后续就是jobmaster启动后会开启调度,然后给task分配资源,开始task的调度,在作业中实际的创建checkpoint。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/265987
推荐阅读
相关标签
  

闽ICP备14008679号