赞
踩
概述:上一篇文文章总体接收了flinkcheckpoint的源码分析的总体概念和流程。并结合代码介绍了checkpoint的发起和任务执行过程
详细参考:https://blog.csdn.net/weixin_40809627/article/details/108537480
本篇文章将接着上篇文章,继续介绍 flink checkPoint 的检查点快照、本地状态存储、checkpoint的确认、和状态恢复等过程。
在task 触发了chckpoint 之后,对于Task而言,最重要的就是将当前 Task 中所有算子的状态快照(state snapshot)储存到外部存储系统的。外部系统可能是一个分布式文件系统,也可能是JobManager内存中。
在 StreamTask.performCheckpoint 方法中,开始进行 checkpoint 操作,这里主要分为三部分:1)checkpoint的准备操作,这里通常不进行太多操作;2)发送 CheckpointBarrier;3)存储检查点快照:
class StreamTask {
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
final long checkpointId = checkpointMetaData.getCheckpointId();
final boolean result;
synchronized (lock) {
if (isRunning) {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
syncSavepointLatch.setCheckpointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
// All of the following steps happen as an atomic step from the perspective of barriers and
// records/watermarks/timers/callbacks.
// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
// checkpoint alignments
// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
// The pre-barrier work should be nothing or minimal in the common case.
operatorChain.prepareSnapshotPreBarrier(checkpointId);
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointMetaData.getTimestamp(),
checkpointOptions);
// Step (3): Take the state snapshot. This should be largely asynchronous, to not
// impact progress of the streaming topology
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
result = true;
}
else {
// we cannot perform our checkpoint - let the downstream operators know that they
// should not wait for any input from this operator
// we cannot broadcast the cancellation markers on the 'operator chain', because it may not
// yet be created
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
Exception exception = null;
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {
try {
recordWriter.broadcastEvent(message);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(
new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
exception);
}
}
if (exception != null) {
throw exception;
}
result = false;
}
}
if (isRunning && syncSavepointLatch.isSet()) {
//保存 savepoint,等待 checkpoint 确认完成
final boolean checkpointWasAcked =
syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
if (checkpointWasAcked) {
finishTask();
}
}
return result;
}
}
在介绍如何存储检查点快照之前,先了解下相关checkpoint 存储相关的一些类,简单地来说,CheckpointStorage是对状态存储系统地抽象,它有两个不同的实现,分别是MemoryBackendCheckpointStorage 和 FsCheckpointStorage。MemoryBackendCheckpointStorage 会将所有算子的检查点状态存储在 JobManager 的内存中,通常不适合在生产环境中使用;而 FsCheckpointStorage 则会把所有算子的检查点状态持久化存储在文件系统中。 CheckpointStorageLocation 是对检查点状态存储位置的一个抽象。它呢能过提供获取检查点输出流的方法,通过输出流将状态和元数据写入到存储系统中。输出流关闭时,可以获得状态句柄StateHandle),后面可以使用句柄重新读取写入的状态。
下面时执行状态快照主要逻辑
每个算个的快照被抽象为OperatorSnapshotFutures,包含了operator state 和 keyed state 的快照结果:
检查点快照的过程被封装为CheckpointingOperation,由于每一个StreamTask 可能包含多个算子,因而内部使用一个ap 维护 OperatorID -> OperatorSnapshotFutures 的关系。CheckpointingOperation 中,快照操作分为两个阶段,第一个阶段同步执行的,第二个阶段异步执行的。
class StreamTask {
private static final class CheckpointingOperation {
//OperatorID -> OperatorSnapshotFutures
private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
//执行检查点快照
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime();
try {
//1. 同步执行的部分
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
//2. 异步执行的部分
// checkpoint 可以配置成同步执行,也可以配置成异步执行的
// 如果是同步执行的,在这里实际上所有的 runnable future 都是已经完成的状态
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
} catch (Exception ex) {
........
}
}
@SuppressWarnings("deprecation")
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
// 调用 StreamOperator.snapshotState 方法进行快照
// 返回的结果是 runnable future,可能是已经执行完了,也可能没有执行完
OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions,
storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
}
}
}
}
在同步执行阶段,会依次调用每一个算子的StreamOperator.snapshotState,返回结果是一个 runnable future。根据 checkpoint 配置成同步模式和异步模式的区别,这个 future 可能处于完成状态,也可能处于未完成状态: 具体参考代码snapshotState
现在我们已经看到 checkpoint 操作是如何同用户自定义函数建立关联的了,接下来我们来看看由 Flink 托管的状态是如何写入存储系统的,即:
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 operator state
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions); //写入 keyed state
首先来看operator state。DefaultOperatorStateBackend 将实际的工作交给DefaultOperatorStateBackendSnapshotStrategy 完成。首先会对当前注册的所有的operator state(包含 list state 和 broadcast state)做深度拷贝,然后将实际的写入操作封装在一个异步的 FutureTask 中,这个 FutureTask 的主要任务包括: 1)、打开输出流 2)、写入状态元数据信息 3)、写入状态 4)、关闭输出流,获得状态句柄。如果不启动异步checkpoint模式,那么这个FutureTask 在同步阶段就会立刻执行。
keyed state 写入的基本流程和此相似,但由于keyed state 在存储时有多种实现,包括基于堆内存和RocksDB 的不同实现,此外基于 RocksDB 的实现还包括支持增量 checkpoint,因而相比于 operator state 要更复杂一些。另外,Flink 自 1.5.0 版本还引入了一个本地状态存储的优化,支持在 TaskManager 的本地保存一份 keyed state,试图优化状态恢复的速度和网络开销。
至此,我们介绍快照操作的第一个阶段,即同步执行的阶段。异步执行阶段被封装为AsyncCheckpointRunnable ,主要的操作包括 1)、执行同步阶段创建FutureTask 2)完成后向 CheckpointCoordinator 发送 Ack 响应。
class StreamTask {
protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
@Override
public void run() {
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
TaskStateSnapshot localTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
// 完成每一个 operator 的状态写入
// 如果是同步 checkpoint,那么在此之前状态已经写入完成
// 如果是异步 checkpoint,那么在这里才会写入状态
for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// finalize the async part of all by executing all snapshot runnables
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
final long asyncEndNanos = System.nanoTime();
final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
//报告 snapshot 完成
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
}
private void reportCompletedSnapshotStates(
TaskStateSnapshot acknowledgedTaskStateSnapshot,
TaskStateSnapshot localTaskStateSnapshot,
long asyncDurationMillis) {
TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
boolean hasLocalState = localTaskStateSnapshot.hasState();
// we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
// to stateless tasks on restore. This enables simple job modifications that only concern
// stateless without the need to assign them uids to match their (always empty) states.
taskStateManager.reportTaskStateSnapshots(
checkpointMetaData,
checkpointMetrics,
hasAckState ? acknowledgedTaskStateSnapshot : null,
hasLocalState ? localTaskStateSnapshot : null);
}
}
public class TaskStateManagerImpl implements TaskStateManager {
@Override
public void reportTaskStateSnapshots(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nonnull CheckpointMetrics checkpointMetrics,
@Nullable TaskStateSnapshot acknowledgedState,
@Nullable TaskStateSnapshot localState) {
long checkpointId = checkpointMetaData.getCheckpointId();
localStateStore.storeLocalState(checkpointId, localState);
//发送 ACK 响应给 CheckpointCoordinator
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);
}
}
所谓本地状态存储,即在存储检查点快照时,在Task 所在的TaskManager 本地文件系统中存储一份副本,这样在进行状态恢复时可以优先从本地状态进行恢复,从而减少网络数据传输的开销。本地状态存储仅针对 keyed state,我们以较为简单的 HeapKeyedStateBackend 为例,看看本地状态存储时如何实现的
class HeapSnapshotStrategy<K>
extends AbstractSnapshotStrategy<KeyedStateHandle> implements SnapshotStrategySynchronicityBehavior<K> {
@Nonnull
@Override
public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
long checkpointId,
long timestamp,
@Nonnull CheckpointStreamFactory primaryStreamFactory,
@Nonnull CheckpointOptions checkpointOptions) throws IOException {
......
//创建 CheckpointStreamWithResultProvider
final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier =
localRecoveryConfig.isLocalRecoveryEnabled() ?
() -> CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()) :
() -> CheckpointStreamWithResultProvider.createSimpleStream(
CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);
........
}
}
其中关键的一点在于,根据是否启用本地状态恢复创建不同的
CheckpointStreamWithResultProvider。
public interface CheckpointStreamWithResultProvider extends Closeable {
@Nonnull
static CheckpointStreamWithResultProvider createSimpleStream(
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
@Nonnull
static CheckpointStreamWithResultProvider createDuplicatingStream(
@Nonnegative long checkpointId,
@Nonnull CheckpointedStateScope checkpointedStateScope,
@Nonnull CheckpointStreamFactory primaryStreamFactory,
@Nonnull LocalRecoveryDirectoryProvider secondaryStreamDirProvider) throws IOException {
CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
try {
File outFile = new File(
secondaryStreamDirProvider.subtaskSpecificCheckpointDirectory(checkpointId),
String.valueOf(UUID.randomUUID()));
Path outPath = new Path(outFile.toURI());
CheckpointStreamFactory.CheckpointStateOutputStream secondaryOut =
new FileBasedStateOutputStream(outPath.getFileSystem(), outPath);
//有两个输出流,primary 和 secondary,secondary 对应本地存储
return new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream(primaryOut, secondaryOut);
} catch (IOException secondaryEx) {
LOG.warn("Exception when opening secondary/local checkpoint output stream. " +
"Continue only with the primary stream.", secondaryEx);
}
return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
}
}
所以在启用本地状态存储的情况下,会创建两个输出流,其中primaryOut 对应外部存储,而secondaryOut 对应本地存储。状态会输出两份。本地状态句柄会存储在 TaskLocalStateStore 中。
1、Task 对checkpoint 的响应应时通过CheckpointResponder 接口完成的:
public interface CheckpointResponder {
/**
* Acknowledges the given checkpoint.
*/
void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot subtaskState);
/**
* Declines the given checkpoint.
*/
void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable cause);
}
RpcCheckpointResponder 作为CheckpointResponder 的具体实现,主要是通过RPC调用通知CheckpointCoordinatorGateway,即通知给JobMaster,JobMaster调用CheckpointCoordinator.receiveAcknowledgeMessage() 和 CheckpointCoordinator.receiveDeclineMessage() 进行处理
2、确认完成
在一个Task 完成checkpoint 操作后,CheckpointCoordinator 接收到ACK响应,对ack的响应的处理流程主要如下:
(1)、根据Ack的checkpointID 从 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找对应的 PendingCheckpoint 如果存在对应的PendingCheckpoint。这个PendingCheckpoint 美哟被丢弃,调用PendingCheckpoint.acknowledgeTask 方法处理 Ack,根据处理结果的不同:
1、如果返回success 则调用completePendingCheckpoint 完成此次 checkpoint
2、返回duplicate 表示Ack 消息重复接收,直接忽略。
3、返回unknown :未知的Ack ,清理上报的Ack中携带的状态句柄
4、discard:checkpoint 已经被discard,清理上报的 Ack 中携带的状态句柄
这个 PendingCheckpoint 已经被丢弃,抛出异常
若不存在对应的 PendingCheckpoint,则清理上报的 Ack 中携带的状态句柄
相应的代码如下:
class CheckpointCoordinator {
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
return false;
}
final long checkpointId = message.getCheckpointId();
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
case SUCCESS:
LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
if (checkpoint.isFullyAcknowledged()) {
completePendingCheckpoint(checkpoint);
}
break;
case DUPLICATE:
LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
break;
case UNKNOWN:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the task's execution attempt id was unknown. Discarding " +
"the state handle to avoid lingering state.", message.getCheckpointId(),
message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
break;
case DISCARDED:
LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
"because the pending checkpoint had been discarded. Discarding the " +
"state handle tp avoid lingering state.",
message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
}
return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
}
else {
boolean wasPendingCheckpoint;
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
if (recentPendingCheckpoints.contains(checkpointId)) {
wasPendingCheckpoint = true;
LOG.warn("Received late message for now expired checkpoint attempt {} from " +
"{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
}
else {
LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
checkpointId, message.getTaskExecutionId(), message.getJob());
wasPendingCheckpoint = false;
}
// try to discard the state so that we don't have lingering state lying around
discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
return wasPendingCheckpoint;
}
}
}
}
对于一个已经触发但还没有完成的checkpoint,即PendingCheckpoint,它是如何处理Ack 消息的那,在pendingCheckpoint 内部维护了两个Map ,分别是:
Map<OperatorID, OperatorState> operatorStates; : 已经接收到 Ack 的算子的状态句柄
Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但还没有接收到的 Task
每当接收一个Ack 消息时,PendingCheckpoint 就从 notYetAcknowledgedTasks 中移除对应的 Task,并保存 Ack 携带的状态句柄保存。当 notYetAcknowledgedTasks 为空时,表明所有的 Ack 消息都接收到了。其中 OperatorState 是算子状态句柄的一层封装:
class OperatorState implements CompositeStateHandle {
/** handles to non-partitioned states, subtaskindex -> subtaskstate */
private final Map<Integer, OperatorSubtaskState> operatorSubtaskStates;
}
public class OperatorSubtaskState implements CompositeStateHandle {
/** Snapshot from the {@link org.apache.flink.runtime.state.OperatorStateBackend}. */
@Nonnull
private final StateObjectCollection<OperatorStateHandle> managedOperatorState;
/** Snapshot written using {@link org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}. */
@Nonnull
private final StateObjectCollection<OperatorStateHandle> rawOperatorState;
/** Snapshot from {@link org.apache.flink.runtime.state.KeyedStateBackend}. */
@Nonnull
private final StateObjectCollection<KeyedStateHandle> managedKeyedState;
/** Snapshot written using {@link org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}. */
@Nonnull
private final StateObjectCollection<KeyedStateHandle> rawKeyedState;
}
1、一旦PendingCheckpoint 确认所有Ack 消息都已经接收,那么就可以完成此次checkpoint了,具体包括:
调用PendingCheckpoint.finalizeCheckpoint() 将 PendingCheckpoint 转化为 CompletedCheckpoint。
获取 CheckpointMetadataOutputStream,将所有的状态句柄信息通过 CheckpointMetadataOutputStream 写入到存储系统中创建一个 CompletedCheckpoint 对象
2、将 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
CompletedCheckpointStore 有两种实现,分别为 StandaloneCompletedCheckpointStore 和 ZooKeeperCompletedCheckpointStore
StandaloneCompletedCheckpointStore 简单地将 CompletedCheckpointStore 存放在一个数组中
ZooKeeperCompletedCheckpointStore 提供高可用实现:先将 CompletedCheckpointStore 写入到 RetrievableStateStorageHelper 中(通常是文件系统),然后将文件句柄存在 ZK 中
保存的 CompletedCheckpointStore 数量是有限的,会删除旧的快照
移除被越过的 PendingCheckpoint,因为 CheckpointID 是递增的,那么所有比当前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丢弃了
依次调用 Execution.notifyCheckpointComplete() 通知所有的 Task 当前 Checkpoint 已经完成
通过 RPC 调用 TaskExecutor.confirmCheckpoint() 告知对应的 Task
拒绝:
在Task进行checkpoint 的过程,可能会发生异常导致checkpoint 失败,这种情况下会通过CheckpointResponder 发出回绝的消息。当CheckpointCoordinator 接收到 DeclineCheckpoint 消息后会移除 PendingCheckpoint,并尝试丢弃已经接收到的Ack消息中已经完成的状态句柄。
class CheckpointCoordinator {
public void receiveDeclineMessage(DeclineCheckpoint message) {
if (shutdown || message == null) {
return;
}
if (!job.equals(message.getJob())) {
throw new IllegalArgumentException("Received DeclineCheckpoint message for job " +
message.getJob() + " while this coordinator handles job " + job);
}
final long checkpointId = message.getCheckpointId();
final String reason = (message.getReason() != null ? message.getReason().getMessage() : "");
PendingCheckpoint checkpoint;
synchronized (lock) {
// we need to check inside the lock for being shutdown as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return;
}
checkpoint = pendingCheckpoints.remove(checkpointId);
if (checkpoint != null && !checkpoint.isDiscarded()) {
LOG.info("Decline checkpoint {} by task {} of job {}.", checkpointId, message.getTaskExecutionId(), job);
discardCheckpoint(checkpoint, message.getReason());
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded but non-removed checkpoint " + checkpointId);
}
else if (LOG.isDebugEnabled()) {
if (recentPendingCheckpoints.contains(checkpointId)) {
// message is for an unknown checkpoint, or comes too late (checkpoint disposed)
LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}",
checkpointId, job, reason);
} else {
// message is for an unknown checkpoint. might be so old that we don't even remember it any more
LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}",
checkpointId, job, reason);
}
}
}
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。