赞
踩
Checkpoint完整流程如上图所示:
以下对整个流程具体说明。
JobMaster将JobGraph转换为ExecutionGraph时,如果开启Checkpoint,会为ExecutionGraph生成一个CheckpointCoordinator
DefaultExecutionGraphBuilder.buildGraph//在此会将JobGraph转换为ExecutionGraph
DefaultExecutionGraph::new
DefaultExecutionGraph::attachJobGraph //创建ExecutionJobVertex
DefaultExecutionTopology.fromExecutionGraph //创建ExecutionTopology
DefaultExecutionGraph::enableCheckpointing //创建CheckpointCoordinator
DefaultExecutionGraph::createCheckpointPlanCalculator//创建DefaultCheckpointPlanCalculator
CheckpointCoordinator::new
CheckpointCoordinator封装了StateBackend和CheckpointStorage
StateBackend负责管理状态:
CheckpointStorage则是负责存储StateBackend管理的状态:
在为StreamTask构造SubtaskCheckpointCoordinatorImpl时会调用:
CheckpointStorage::createCheckpointStorage
创建CheckpointStorageAccess用于执行Checkpoint时解析状态存储位置
CheckpointCoordinator在执行状态快照时会调用
CheckpointStorageAccess::resolveCheckpointStorageLocation
生成CheckpointStreamFactory用于生成读写状态数据流
JobMaster状态转换为running后,通过CheckpointCoordinator向SourceTask发送TriggerCheckpoint
JobMaster::start //RPCServer启动 JobMaster::onStart JobMaster::startJobExecution JobMaster::startJobMasterServices //获取RM地址后与RM建立连接 JobMaster::startScheduling SchedulerBase::startScheduling DefaultScheduler::startSchedulingInternal SchedulerBase::transitionToRunning DefaultExecutionGraph::transitionToRunning //调用ExecutionGraph监听器通知状态变化 CheckpointCoordinatorDeActivator::jobStatusChanges//触发checkpoint CheckpointCoordinator::startCheckpointScheduler CheckpointCoordinator::scheduleTriggerWithDelay //定时不断触发Checkpoint CheckpointCoordinator::triggerCheckpoint CheckpointCoordinator::startTriggeringCheckpoint DefaultCheckpointPlanCalculator::calculateCheckpointPlan//Plan中会隔离出SourceTask作为作为Trigger Checkpoint的入口 CheckpointCoordinator::createPendingCheckpoint CheckpointCoordinator::triggerCheckpointRequest CheckpointCoordinator::triggerTasks Execution::triggerCheckpoint //向每个SourceTask发送TriggerCheckpoint请求 Execution::triggerCheckpointHelper TaskManagerGateway::triggerCheckpoint//向TaskExecutor发RPC
SourceTask由JobMaster RPC直接触发,执行时先广播CheckpointBarrier,然后对状态执行异步快照
TaskExecutor::triggerCheckpoint Task::triggerCheckpointBarrier AbstractInvokable::triggerCheckpointAsync SourceStreamTask::triggerCheckpointAsync StreamTask::triggerCheckpointAsync StreamTask::triggerCheckpointAsyncInMailbox StreamTask::performCheckpoint SubtaskCheckpointCoordinatorImpl::checkpointState OperatorChain.broadcastEvent //广播CheckpointBarrier CheckpointStorage::createCheckpointStorage//为JobId创建CheckpointStorageAccess SubtaskCheckpointCoordinatorImpl::takeSnapshotSync CheckpointStorageWorkerView::resolveCheckpointStorageLocation//CheckpointStorageAccess创建 CheckpointStreamFactory OperatorChain::snapshotState //对每个Operator RegularOperatorChain::buildOperatorSnapshotFutures RegularOperatorChain::checkpointStreamOperator AbstractStreamOperator::snapshotState StreamOperatorStateHandler::snapshotState//调用Operator/Keyed Backend的snapshot StateSnapshotContextSynchronousImpl::new AbstractUdfStreamOperator::snapshotState //调用UDF中snapshotState方法,一般用于更新OperatorState DefaultOperatorStateBackend::snapshot SnapshotStrategyRunner::snapshot DefaultOperatorStateBackendSnapshotStrategy::syncPrepareResources//深copy operator state,便于后续进行异步快照 DefaultOperatorStateBackendSnapshotStrategy::asyncSnapshot//异步快照 CheckpointStateOutputStream::closeAndGetHandle OperatorStreamStateHandle::new //包装元信息及数据StreamStateHandle HeapKeyedStateBackend::snapshot SnapshotStrategyRunner::snapshot HeapSnapshotStrategy::syncPrepareResources HeapSnapshotStrategy::asyncSnapshot //采用COWSateTable异步快照 CheckpointStateOutputStream::closeAndGetHandle KeyGroupsStateHandle::new //包装KeyGroup及数据StreamStateHandle SubtaskCheckpointCoordinatorImpl::finishAndReportAsync //向JobMaster发送checkpoint的结果 AsyncCheckpointRunnable::new AsyncCheckpointRunnable::run AsyncCheckpointRunnable::finalizeNonFinishedSnapshots OperatorSnapshotFinalizer::new //等待TaskSnapshot状态信息序列化完成 AsyncCheckpointRunnable::reportCompletedSnapshotStates TaskStateManagerImpl::reportTaskStateSnapshots RpcCheckpointResponder::acknowledgeCheckpoint//向JobMaster发送Ack,带上State信息
在StreamTask启动后调用StreamTask::processInput不断读取数据进行处理, 非SourceTask在收到上游的CheckpointBarrier对齐后触发Checkpoint,
StreamTask::processInput StreamOneInputProcessor::processInput StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput) AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement 处理 CheckpointedInputGate::pollNext CheckpointedInputGate::handleEvent SingleCheckpointBarrierHandler::processBarrier SingleCheckpointBarrierHandler::markCheckpointAlignedAndTransformState WaitingForFirstBarrier::barrierReceived AbstractAlignedBarrierHandlerState::barrierReceived SingleCheckpointBarrierHandler.ControllerImpl::allBarriersReceived//判断对齐 AbstractAlignedBarrierHandlerState::triggerGlobalCheckpoint SingleCheckpointBarrierHandler.ControllerImpl::triggerGlobalCheckpoint SingleCheckpointBarrierHandler::triggerCheckpoint CheckpointBarrierHandler::notifyCheckpoint //触发StreamTask Checkpoint StreamTask::triggerCheckpointOnBarrier StreamTask::performCheckpoint //后续调用过程与SourceTask一样 SubtaskCheckpointCoordinatorImpl::checkpointState
根据调用栈看出,非SourceStreamTask执行Checkpoint只是触发时机不同,SourceTask由JobMaster RPC定时不断触发,非SourceTask则是在上游的CheckpointBarrier对齐后触发Checkpoint,最终执行逻辑都是将当前算子的信息写入CheckpointStorage后向JobMaster发送确认信息。
StreamTask向JobMaster ACK信息中包含状态元信息及StreamStateHandle,根据状态存储位置分为:
JobMaster收到StreamTask的acknowledgeCheckpoint后:
JobMaster::acknowledgeCheckpoint
SchedulerBase::acknowledgeCheckpoint
ExecutionGraphHandler::acknowledgeCheckpoint
CheckpointCoordinator::receiveAcknowledgeMessage
PendingCheckpoint::acknowledgeTask //某一个Task的确认
PendingCheckpoint::updateOperatorState//更新SubTask状态信息
CheckpointCoordinator::completePendingCheckpoint//所有Task Ack后
PendingCheckpoint::finalizeCheckpoint
Checkpoints.storeCheckpointMetadata//保存CheckpointMetadata
CompletedCheckpoint::new
CheckpointCoordinator::sendAcknowledgeMessages//向Task通知Checkpoint完成消息
ExecutionVertex::notifyCheckpointComplete
TaskManagerGateway.notifyCheckpointComplete
JobMaster收到所有StreamTask的Checkpoint状态信息后,标志一次Checkpoint完成,这时会通知StreamTask CheckPoint完成消息,便于SubTask监听Checkpoint完成后做后续动作。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。