赞
踩
Flink chcekpoint作为flink中最重要的部分,是flink精准一次性的重要保证,可以这么说flink之所以这么成功和她的checkpoint机制是离不开的。
之前大概学习了一下flink的checkpoint源码,但是还是有点晕乎乎的,甚至有点不理解我们作业中设置的checkpoint配置flink是如何读取到的,并且他是如何往下传播的。
这次我详细屡了一下,方便我们更好理解checkpoint,下面我们先看代码中我们一般是如何配置flink checkpoint的:
- // TODO 1. 环境准备
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(4);
-
- // TODO 2. 状态后端设置
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
- env.getCheckpointConfig().enableExternalizedCheckpoints(
- CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
- );
- env.setRestartStrategy(RestartStrategies.failureRateRestart(
- 3, Time.days(1), Time.minutes(1)
- ));
- env.setStateBackend(new HashMapStateBackend());
- env.getCheckpointConfig().setCheckpointStorage(
- "hdfs://hadoop102:8020/ck"
- );
-
- 中间的业务代码省略
-
- // TODO 3. 启动任务
- env.execute();
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
在这里我们可以看到checkpoint的相关配置是调用streamExecutionEnvironment的各种方法配置,那我们代码中的checkpoint配置env肯定是拿到了
点进enableCheckpointing()方法中可以看到我们配置的checkpint相关参数被streamExecutionEnvironment的一个成员变量checkpointCfg获取到了
所以下面我们只需要盯着这个checkpointCfg是怎么传到下面的就可以了,下面我们从作业的起点env.execute()方法往下走,下一步flink就要把我们的作业转换成streamGraph了
这个方法中调用了getStreamGraph方法,生成作业对应的streamGraph
上一步我们看到了env.execute(jobName)方法,我们看到了他的方法中调用了getStreamGraph方法,这个方法就是生成StreamGraph的,从下面这个图中我们可以看到首先它利用了getStreamGraphGenerator生成StreamGraphGenerator,然后根据generator生成streamGraph
- private StreamGraphGenerator getStreamGraphGenerator() {
- if (transformations.size() <= 0) {
- throw new IllegalStateException(
- "No operators defined in streaming topology. Cannot execute.");
- }
-
- final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);
-
- return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
- .setRuntimeExecutionMode(executionMode) //设置执行模式,流式或者批处理
- .setStateBackend(defaultStateBackend) //设置状态后端
- .setSavepointDir(defaultSavepointDirectory) //设置checkpoint文件夹
- .setChaining(isChainingEnabled) //是否开启chaining
- .setUserArtifacts(cacheFile) //设置用户jar包
- .setTimeCharacteristic(timeCharacteristic) //设置时间语义
- .setDefaultBufferTimeout(bufferTimeout); //设置buffer超时时间
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
从这个方法体中我们可以看到checkpointCfg被当做参数传给了streamGraphGenerator,并且他还配置了checkpoint和状态后端的相关参数
- public StreamGraph generate() {
- //checkpointCfg和savepoint的配置被当做参数传给streamGraph
- streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
- shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
- configureStreamGraph(streamGraph);
-
- alreadyTransformed = new HashMap<>();
-
- //转换算子,将env的成员变量transformations中的算子遍历出来,塞到streamGraph中
- for (Transformation<?> transformation : transformations) {
- transform(transformation);
- }
-
- for (StreamNode node : streamGraph.getStreamNodes()) {
- if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
- for (StreamEdge edge : node.getInEdges()) {
- edge.setSupportsUnalignedCheckpoints(false);
- }
- }
- }
-
- final StreamGraph builtStreamGraph = streamGraph;
-
- alreadyTransformed.clear();
- alreadyTransformed = null;
- streamGraph = null;
-
- return builtStreamGraph;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
在streamGraphGenerator的generate()方法中我们可以看到checkpointCfg作为参数传给了streamGraph,这样streamGraph就拿到了我们的checkpoint配置了
下面我们先返回streamExecutionEnvironment的execute方法,下面我们要看的是在Jobgraph生产过程中,checkpoint配置是如何传递的,从env.execute()方法经过一系列的方法跳转,最终我们可以看到生成Jobgraph的方法是streamGraph.getJobGraph()
这个方法中调用 StreamingJobGraphGenerator.createJobGraph方法
这个方法中首先创建了一个StreamingJobGraphGenerator.,然后调用其createJobGraph方法
然后我们详细看一下createJobGraph方法
- private JobGraph createJobGraph() {
- preValidate(); //预检查checkpoint是否开始等属性
- jobGraph.setJobType(streamGraph.getJobType());
-
- jobGraph.enableApproximateLocalRecovery(
- streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
-
- // Generate deterministic hashes for the nodes in order to identify them across
- // submission iff they didn't change.
- Map<Integer, byte[]> hashes =
- defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
-
- // Generate legacy version hashes for backwards compatibility
- List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
- for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
- legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
- }
-
- setChaining(hashes, legacyHashes);
-
- setPhysicalEdges();
-
- setSlotSharingAndCoLocation();
-
- setManagedMemoryFraction(
- Collections.unmodifiableMap(jobVertices),
- Collections.unmodifiableMap(vertexConfigs),
- Collections.unmodifiableMap(chainedConfigs),
- id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
- id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
-
- //配置checkpoint
- configureCheckpointing();
-
- //设置savepoint
- jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
-
- final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
- JobGraphUtils.prepareUserArtifactEntries(
- streamGraph.getUserArtifacts().stream()
- .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
- jobGraph.getJobID());
-
- for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
- distributedCacheEntries.entrySet()) {
- jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
- }
-
- // set the ExecutionConfig last when it has been finalized
- try {
- jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
- } catch (IOException e) {
- throw new IllegalConfigurationException(
- "Could not serialize the ExecutionConfig."
- + "This indicates that non-serializable types (like custom serializers) were registered");
- }
-
- return jobGraph;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
在preValidate方法中会检验是否开始了checkpoint,是否支持unalign checkpoint等,在configureCheckpointing()方法中才真正将streamGraph的checkpoint相关配置传给jobGraph
- private void configureCheckpointing() {
- CheckpointConfig cfg = streamGraph.getCheckpointConfig();
-
- long interval = cfg.getCheckpointInterval();
- if (interval < MINIMAL_CHECKPOINT_TIME) {
- // interval of max value means disable periodic checkpoint
- interval = Long.MAX_VALUE;
- }
-
- // --- configure options ---
-
- CheckpointRetentionPolicy retentionAfterTermination;
- if (cfg.isExternalizedCheckpointsEnabled()) {
- CheckpointConfig.ExternalizedCheckpointCleanup cleanup =
- cfg.getExternalizedCheckpointCleanup();
- // Sanity check
- if (cleanup == null) {
- throw new IllegalStateException(
- "Externalized checkpoints enabled, but no cleanup mode configured.");
- }
- retentionAfterTermination =
- cleanup.deleteOnCancellation()
- ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
- : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
- } else {
- retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
- }
-
- // --- configure the master-side checkpoint hooks ---
-
- final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();
-
- for (StreamNode node : streamGraph.getStreamNodes()) {
- if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory) {
- Function f =
- ((UdfStreamOperatorFactory) node.getOperatorFactory()).getUserFunction();
-
- if (f instanceof WithMasterCheckpointHook) {
- hooks.add(
- new FunctionMasterCheckpointHookFactory(
- (WithMasterCheckpointHook<?>) f));
- }
- }
- }
-
- // because the hooks can have user-defined code, they need to be stored as
- // eagerly serialized values
- final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
- if (hooks.isEmpty()) {
- serializedHooks = null;
- } else {
- try {
- MasterTriggerRestoreHook.Factory[] asArray =
- hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
- serializedHooks = new SerializedValue<>(asArray);
- } catch (IOException e) {
- throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
- }
- }
-
- // because the state backend can have user-defined code, it needs to be stored as
- // eagerly serialized value
- final SerializedValue<StateBackend> serializedStateBackend;
- if (streamGraph.getStateBackend() == null) {
- serializedStateBackend = null;
- } else {
- try {
- serializedStateBackend =
- new SerializedValue<StateBackend>(streamGraph.getStateBackend());
- } catch (IOException e) {
- throw new FlinkRuntimeException("State backend is not serializable", e);
- }
- }
-
- // because the checkpoint storage can have user-defined code, it needs to be stored as
- // eagerly serialized value
- final SerializedValue<CheckpointStorage> serializedCheckpointStorage;
- if (streamGraph.getCheckpointStorage() == null) {
- serializedCheckpointStorage = null;
- } else {
- try {
- serializedCheckpointStorage =
- new SerializedValue<>(streamGraph.getCheckpointStorage());
- } catch (IOException e) {
- throw new FlinkRuntimeException("Checkpoint storage is not serializable", e);
- }
- }
-
- // --- done, put it all together ---
- 创建一个JobCheckpointingSettings,用来封装checkpoint的相关配置
- JobCheckpointingSettings settings =
- new JobCheckpointingSettings(
- CheckpointCoordinatorConfiguration.builder()
- .setCheckpointInterval(interval)
- .setCheckpointTimeout(cfg.getCheckpointTimeout())
- .setMinPauseBetweenCheckpoints(cfg.getMinPauseBetweenCheckpoints())
- .setMaxConcurrentCheckpoints(cfg.getMaxConcurrentCheckpoints())
- .setCheckpointRetentionPolicy(retentionAfterTermination)
- .setExactlyOnce(
- getCheckpointingMode(cfg) == CheckpointingMode.EXACTLY_ONCE)
- .setPreferCheckpointForRecovery(cfg.isPreferCheckpointForRecovery())
- .setTolerableCheckpointFailureNumber(
- cfg.getTolerableCheckpointFailureNumber())
- .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
- .setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis())
- .build(),
- serializedStateBackend,
- serializedCheckpointStorage,
- serializedHooks);
- //将JobCheckpointingSettings传给JobGraph
- jobGraph.setSnapshotSettings(settings);
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
在这个方法中会获取streamGraph的checkpointCfg,创建一个JobcheckpointSettings,并将streamGraph相关的checkpoint配置取出来放进JobcheckpointSettings,JobGraph中负责保存checkpoint相关配置的成员变量是snapshotSettings。
上述两个过程均是在客户端完成的,如果是yarn集群环境,JobGraph生成后,client会创建一个yarn-client用于提交作业,作业提交后,WebmonitorEndpoint的JobsubmitHandler会调用自己的handleRequest方法用于处理提交的作业。
ExecutionGraph是在jobmaster启动的时候,将jobGraph转换成ExecutionGraph,由于jobmaster启动的过程比较繁琐,所以这边我们分析详细一点,从集群接收到客户端提交的jobGraph开始分析
这个方法中主要是获取client那边传过来的JobGraph,jar包和相关的依赖,然后调用dispatcherGateway.submitJob方法提交JobGraph
- @Override
- protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
- final Collection<File> uploadedFiles = request.getUploadedFiles();
- final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
- File::getName,
- Path::fromLocalFile
- ));
-
- if (uploadedFiles.size() != nameToFile.size()) {
- throw new RestHandlerException(
- String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
- uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
- nameToFile.size(),
- uploadedFiles.size()),
- HttpResponseStatus.BAD_REQUEST
- );
- }
-
- final JobSubmitRequestBody requestBody = request.getRequestBody();
-
- if (requestBody.jobGraphFileName == null) {
- throw new RestHandlerException(
- String.format("The %s field must not be omitted or be null.",
- JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
- HttpResponseStatus.BAD_REQUEST);
- }
-
- //加载Jobgraph
- CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
-
- //获取jar包
- Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
- //获取jar包相关的依赖
- Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
-
- CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
-
- //调用dispatcherGateway的submitJob方法
- CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
-
- return jobSubmissionFuture.thenCombine(jobGraphFuture,
- (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
dispatcherGateway.submitJob方法是一个抽象方法,他有两个实现Dispatcher和MiniDispatcher,miniDispatcher适用于本地调试调用,这里我们选择Dispatcher的submitJob方法,这个方法里对Jobgraph做了一些判断,jobID是否重复,是否是partialResource等,然后调用其internalSubmitJob方法
- @Override
- public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
- log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
-
- try {
- if (isDuplicateJob(jobGraph.getJobID())) {
- return FutureUtils.completedExceptionally(
- new DuplicateJobSubmissionException(jobGraph.getJobID()));
- } else if (isPartialResourceConfigured(jobGraph)) {
- return FutureUtils.completedExceptionally(
- new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
- "resources configured. The limitation will be removed in future versions."));
- } else {
- return internalSubmitJob(jobGraph);
- }
- } catch (FlinkException e) {
- return FutureUtils.completedExceptionally(e);
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这个方法中调用persistAndRunJob对Job进行持久化并运行,然后处理Job运行的结果
- private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
- log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
-
- final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
- this::persistAndRunJob)
- .thenApply(ignored -> Acknowledge.get());
-
- return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
- if (throwable != null) {
- cleanUpJobData(jobGraph.getJobID(), true);
-
- ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
- final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
- log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
- throw new CompletionException(
- new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
- } else {
- return acknowledge;
- }
- }, ioExecutor);
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这里的jobGraphWriter就是用来持久化jobGraph的,然后调用其runJob方法
- private void persistAndRunJob(JobGraph jobGraph) throws Exception {
- jobGraphWriter.putJobGraph(jobGraph);
- runJob(jobGraph, ExecutionType.SUBMISSION);
- }
这个方法中主要做了以下几件事:
1.调用createJobManagerRunner方法,创建并启动jobmaster
2.处理Job的运行结果
3.Job执行失败后,清理Job的执行状态文件,移除Job
- private void runJob(JobGraph jobGraph, ExecutionType executionType) {
- Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
- long initializationTimestamp = System.currentTimeMillis();
- /*TODO 创建并启动JobMaster*/
- CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);
-
- DispatcherJob dispatcherJob = DispatcherJob.createFor(
- jobManagerRunnerFuture,
- jobGraph.getJobID(),
- jobGraph.getName(),
- initializationTimestamp);
- runningJobs.put(jobGraph.getJobID(), dispatcherJob);
-
- final JobID jobId = jobGraph.getJobID();
- //处理Job的运行结果
- final CompletableFuture<CleanupJobState> cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync(
- (dispatcherJobResult, throwable) -> {
- Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
-
- if (dispatcherJobResult != null) {
- return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);
- } else {
- return dispatcherJobFailed(jobId, throwable);
- }
- }, getMainThreadExecutor());
- //Job执行失败后,清理Job的执行状态文件,然后移除Job
- final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture
- .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
- .thenCompose(Function.identity());
-
- FutureUtils.assertNoException(jobTerminationFuture);
- registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这个方法中调用了JobManagerFactory.createJobManagerRunner()创建jobmaster,然后调用start方法启动Jobmaster
- CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
- final RpcService rpcService = getRpcService();
- return CompletableFuture.supplyAsync(
- () -> {
- try {
- /*TODO 创建JobMaster */
- JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
- jobGraph,
- configuration,
- rpcService,
- highAvailabilityServices,
- heartbeatServices,
- jobManagerSharedServices,
- new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
- fatalErrorHandler,
- initializationTimestamp);
- /*TODO 启动JobMaster*/
- runner.start();
- return runner;
- } catch (Exception e) {
- throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
- }
- },
- ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
jobmaster的启动还是比较麻烦的,中间有比较多的过程,这里我们省略中间一些不重要的过程
1.JobManagerFactory # createJobManagerRunner
2.DefaultJobManagerRunnerFactory # createJobManagerRunner
3.JobManagerRuunerImpl # JobManagerRuunerImpl
4.DefaultJobMasterServiceFactory # createJobMasterService
5.JobMaster # JobMaster ==> createScheduler
6.DefaultSchedulerFactory # createInstance
7.SchedulerBase # createAndRestoreExecutionGraph
在该方法中调用了createExecutionGraph创建executionGraph,并且还创建检查点协调器,负责该job检查点相关的工作
- private ExecutionGraph createAndRestoreExecutionGraph(
- JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
- ShuffleMaster<?> shuffleMaster,
- JobMasterPartitionTracker partitionTracker,
- ExecutionDeploymentTracker executionDeploymentTracker,
- long initializationTimestamp) throws Exception {
-
- ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);
-
- final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
-
- if (checkpointCoordinator != null) {
- // check whether we find a valid checkpoint
- if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
- new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
-
- // check whether we can restore from a savepoint
- tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
- }
- }
-
- return newExecutionGraph;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
8.ExecutionGraph中checkpoint的配置
schedulerBase#createExecutionGraph: 内部调用了ExecutionGraphBuilder#buildGraph方法,
这个方法中做了很多checkpoint相关的事,比如:
1.获取jobgraph的checkpoint配置
2.创建checkpointIDCounter
3.获取状态后端
4.获取用户自定义的checkpoint钩子
5.将checkpoint的各种参数传入executionGraph
- public static ExecutionGraph buildGraph(
- @Nullable ExecutionGraph prior,
- JobGraph jobGraph,
- Configuration jobManagerConfig,
- ScheduledExecutorService futureExecutor,
- Executor ioExecutor,
- SlotProvider slotProvider,
- ClassLoader classLoader,
- CheckpointRecoveryFactory recoveryFactory,
- Time rpcTimeout,
- RestartStrategy restartStrategy,
- MetricGroup metrics,
- BlobWriter blobWriter,
- Time allocationTimeout,
- Logger log,
- ShuffleMaster<?> shuffleMaster,
- JobMasterPartitionTracker partitionTracker,
- FailoverStrategy.Factory failoverStrategyFactory,
- ExecutionDeploymentListener executionDeploymentListener,
- ExecutionStateUpdateListener executionStateUpdateListener,
- long initializationTimestamp) throws JobExecutionException, JobException {
-
-
-
- 。。。。。。这部分省略不看。。。。。
-
- // configure the state checkpointing
- //获取jobGraph的checkpoint设置
- JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
- if (snapshotSettings != null) {
- List<ExecutionJobVertex> triggerVertices =
- idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
-
- List<ExecutionJobVertex> ackVertices =
- idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
-
- List<ExecutionJobVertex> confirmVertices =
- idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
-
- CompletedCheckpointStore completedCheckpoints;
- CheckpointIDCounter checkpointIdCounter;
- try {
- int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
- CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);
-
- if (maxNumberOfCheckpointsToRetain <= 0) {
- // warning and use 1 as the default value if the setting in
- // state.checkpoints.max-retained-checkpoints is not greater than 0.
- log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
- CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
- maxNumberOfCheckpointsToRetain,
- CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());
-
- maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
- }
-
- completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
- checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
- }
- catch (Exception e) {
- throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
- }
-
- // Maximum number of remembered checkpoints
- int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
-
- CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
- historySize,
- ackVertices,
- snapshotSettings.getCheckpointCoordinatorConfiguration(),
- metrics);
-
- // load the state backend from the application settings
- final StateBackend applicationConfiguredBackend;
- final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
-
- if (serializedAppConfigured == null) {
- applicationConfiguredBackend = null;
- }
- else {
- try {
- applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
- } catch (IOException | ClassNotFoundException e) {
- throw new JobExecutionException(jobId,
- "Could not deserialize application-defined state backend.", e);
- }
- }
- //获取状态后端
-
- final StateBackend rootBackend;
- try {
- rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
- applicationConfiguredBackend, jobManagerConfig, classLoader, log);
- }
- catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
- throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
- }
-
- // instantiate the user-defined checkpoint hooks
- //初始化用户自定义的checkpoint 钩子
-
- final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
- final List<MasterTriggerRestoreHook<?>> hooks;
-
- if (serializedHooks == null) {
- hooks = Collections.emptyList();
- }
- else {
- final MasterTriggerRestoreHook.Factory[] hookFactories;
- try {
- hookFactories = serializedHooks.deserializeValue(classLoader);
- }
- catch (IOException | ClassNotFoundException e) {
- throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
- }
-
- final Thread thread = Thread.currentThread();
- final ClassLoader originalClassLoader = thread.getContextClassLoader();
- thread.setContextClassLoader(classLoader);
-
- try {
- hooks = new ArrayList<>(hookFactories.length);
- for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
- hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
- }
- }
- finally {
- thread.setContextClassLoader(originalClassLoader);
- }
- }
-
- final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();
-
- //将checkpoint的配置传入executionGraph
- executionGraph.enableCheckpointing(
- chkConfig,
- triggerVertices,
- ackVertices,
- confirmVertices,
- hooks,
- checkpointIdCounter,
- completedCheckpoints,
- rootBackend,
- checkpointStatsTracker);
- }
-
- // create all the metrics for the Execution Graph
-
- metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
- metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
- metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
-
- executionGraph.getFailoverStrategy().registerMetrics(metrics);
-
- return executionGraph;
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
至此checkpoint的相关配置就传入了ExecutionGraph,并且还创建了checkpoint检查点来负责检查点相关的工作,后续就是jobmaster启动后会开启调度,然后给task分配资源,开始task的调度,在作业中实际的创建checkpoint。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。