当前位置:   article > 正文

Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析

Flink checkpoint 源码分析- Flink Checkpoint 触发流程分析

序言

最近因为工作需要在阅读flink checkpoint处理机制,学习的过程中记录下来,并分享给大家。也算是学习并记录。

目前公司使用的flink版本为1.11。因此以下的分析都是基于1.11版本来的。

在分享前可以简单对flink checkpoint机制做一个大致的了解。

Flink checkpoint 机制介绍

Flink的checkpoint的过程依赖于异步屏障快照算法,该算法在《Lightweight Asynchronous Snapshots for Distributed Dataflows》这篇paper中被提出。理解了这篇paper也就明白了flink的chekpoint机制。paper整体来说比较简单易懂,下面简单介绍下paper的大体内容和核心的算法。

[1] 引用:Flink Checkpoint原理解析 - 知乎

代码分析

Flink checkpoint 的触发是通过CheckpointCoordinator 的定时线程完后。

  1. private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
  2. return timer.scheduleAtFixedRate(
  3. new ScheduledTrigger(),
  4. initDelay, baseInterval, TimeUnit.MILLISECONDS);
  5. }

之后通过snapshotTaskState RPC的调用来实现触发checkpoint的

代码中遍历executions 来触发checkpoint,那么executions是什么东西呢?

Flink 代码中维护了一个叫tasksToTrigger的数组。

这个地方向前追溯,可以一直到jobgrap的生成。从名字和代码就可以看出,这个里面存的是没有inputchannel的节点,source节点没有inputchannel,所以回答上面的问题,executions 中是source节点,也就是做checkpoint 时 checkpointcoordinate 会给source节点发送rpc。

通过一个很长亮度的调用,最后到了SubtaskCheckpointCoordinatorImpl 中的

  1. public void checkpointState(
  2. CheckpointMetaData metadata,
  3. CheckpointOptions options,
  4. CheckpointMetricsBuilder metrics,
  5. OperatorChain<?, ?> operatorChain,
  6. Supplier<Boolean> isCanceled) throws Exception {
  7. checkNotNull(options);
  8. checkNotNull(metrics);
  9. // All of the following steps happen as an atomic step from the perspective of barriers and
  10. // records/watermarks/timers/callbacks.
  11. // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
  12. // checkpoint alignments
  13. if (lastCheckpointId >= metadata.getCheckpointId()) {
  14. LOG.info("Out of order checkpoint barrier (aborted previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
  15. channelStateWriter.abort(
  16. metadata.getCheckpointId(),
  17. new CancellationException("checkpoint aborted via notification"),
  18. true);
  19. checkAndClearAbortedStatus(metadata.getCheckpointId());
  20. return;
  21. }
  22. // Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint if necessary.
  23. lastCheckpointId = metadata.getCheckpointId();
  24. if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
  25. // broadcast cancel checkpoint marker to avoid downstream back-pressure due to checkpoint barrier align.
  26. operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));
  27. LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.", metadata.getCheckpointId());
  28. return;
  29. }
  30. // if checkpoint has been previously unaligned, but was forced to be aligned (pointwise
  31. // connection), revert it here so that it can jump over output data
  32. if (options.getAlignment() == CheckpointOptions.AlignmentType.FORCED_ALIGNED) {
  33. options = options.withUnalignedSupported();
  34. initInputsCheckpoint(metadata.getCheckpointId(), options);
  35. }
  36. // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
  37. // The pre-barrier work should be nothing or minimal in the common case.
  38. operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());
  39. // Step (2): Send the checkpoint barrier downstream
  40. LOG.debug(
  41. "Task {} broadcastEvent at {}, triggerTime {}, passed time {}",
  42. taskName,
  43. System.currentTimeMillis(),
  44. metadata.getTimestamp(),
  45. System.currentTimeMillis() - metadata.getTimestamp());
  46. CheckpointBarrier checkpointBarrier =
  47. new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
  48. operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());
  49. // Step (3): Register alignment timer to timeout aligned barrier to unaligned barrier
  50. registerAlignmentTimer(metadata.getCheckpointId(), operatorChain, checkpointBarrier);
  51. // Step (4): Prepare to spill the in-flight buffers for input and output
  52. if (options.needsChannelState()) {
  53. // output data already written while broadcasting event
  54. channelStateWriter.finishOutput(metadata.getCheckpointId());
  55. }
  56. // Step (5): Take the state snapshot. This should be largely asynchronous, to not impact
  57. // progress of the
  58. // streaming topology
  59. Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
  60. try {
  61. if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
  62. finishAndReportAsync(snapshotFutures, metadata, metrics, options);
  63. } else {
  64. cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
  65. }
  66. } catch (Exception ex) {
  67. cleanup(snapshotFutures, metadata, metrics, ex);
  68. throw ex;
  69. }
  70. }

代码中可以看到构造了CheckpointBarrier, source将barrier当成数据广播给下游的所有节点。使用的方法就是operatorChain.brodacastEvent()。这里就回到最开始提到的异步屏障快照算法。

下游收到了barrier,如何进行快照处理的?flink同时有多种类型的checkpoint,他们分别的处理时机是啥,后面我会进一步进行代码分析。

  1. CheckpointBarrier checkpointBarrier =
  2. new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options);
  3. operatorChain.broadcastEvent(checkpointBarrier, options.isUnalignedCheckpoint());

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/548767
推荐阅读
相关标签
  

闽ICP备14008679号