当前位置:   article > 正文

flink-16 checkpoint_flink从checkpoint启动

flink从checkpoint启动

checkpoint概念

为了保证state的容错性,flink需要对state进行checkpoint

checkpoint是flink实现容错机制最核心的功能,它能根据配置周期性的基于Stream各个operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来。

当flink程序崩溃时,重新运行程序时可以有选择的从这些快照进行恢复

checkpoint的前提

flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提

  • 数据源:持久化source,它需要支持在一定时间内重放时间
    • 典型例子是持久化的消息队列(kafka RabbitMQ等)
    • 或文件系统(HDFS,S3,GFS等)
  • 存储:用于state的持久化存储,如分布式文件系统(HDFS,S3,GFS等)

checkpoint步骤

  • 暂停数据的流入
  • 等待流中on-the-fly的数据被处理干净,然后得到flink graph的一个Snapchat
  • 将所有Task中State拷贝到State Backedn中,如HDFS。
    • 此动作由各个Task Manager完成
  • 各个Task Mager将Task State的位置上报给Job Manager,完成checkpoint
  • 恢复数据的流入

注意

  • checkpoint前需要暂停数据输入 + 排干on-the-fly数据的操作,这样才能拿到同一时刻下所有subtask的state
  • 达到checkpoint的触发条件后,会把每一个task的状态先写入到内存,然后再写入到hdfs

配置checkpoint

  • 默认checkpoint是disable的,想要使用的时候需要先启用
  • checkpointMode有两种
    • Exactly-once:数据处理且只被处理一次
      • checkpoint开启后,默认的checkpointMode是Exactly-once
      • flink非常容易实现Exactly-once语义,直接可以将消息的state(就是消费信心的offset)保存在HDFS上
    • At-least-once:数据至少被处理一次

Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)

//默认checkpoint功能是disabled的,想要使用的时候需要先启用
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
environment.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
environment.getCheckpointConfig.se
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/正经夜光杯/article/detail/876311
推荐阅读
相关标签
  

闽ICP备14008679号