当前位置:   article > 正文

Checkpoint的问题总结_asynchronous task checkpoint failed.

asynchronous task checkpoint failed.

Checkpoint的原理

Flink checkpoint是一种容错恢复机制,保证 Flink 任务运行突然失败时,能够从最近 Checkpoint 进行状态恢复启动,进行错误容忍,是在Chandy–Lamport算法的基础上实现的一种分布式快照算法,其内部使用分布式数据流轻量级异步快照。

Checkpoint流程

  • JobManager触发checkpoint
  • Source 收到 trigger checkpoint,自己开始做 snapshot,并往下游发送 barrier
  • 下游接收 barrier(需要 barrier都到齐才会开始做checkpoint)
  • Task 开始同步阶段 snapshot
  • Task 开始异步阶段 snapshot
  • Task snapshot 完成,汇报给 JobManager

Checkpoint的内容

  • 应用程序的当前状态
  • 输入流的位置

Checkpoint支持语义

  • Exactly Once保证每条数据对于 Flink 的状态结果只影响一次
  • At Least Once保证每条数据对于 Flink 状态计算至少影响一次。比如在 WordCount 程序中,你统计到的某个单词的单词数可能会比真实的单词数要大,因为同一条消息,你可能将其计算多次
  • Exactly Once 和 At Least Once 具体在底层实现大致相同,具体差异表现在 Barrier 对齐方式处理

Exactly Once实现方式

Exactly Once

  • Flink计算的Exactly Once:我们一般讨论的Exactly Once都是Flink计算Exactly Once,Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE
  • 端对端的Exactly Once:Kafka在0.11版本首次引入了事务,为在Flink程序中使用Kafka producer提供Exactly-Once语义提供了可能性;Kafaka 0.11 producer的事务是在TwoPhaseCommitSinkFunction基础上实现的,和at-least-once producer相比只增加了非常低的开销。端对端的Exactly Once需要 source 和 sink 支持。数据输出端实现Exactly-Once语义,用户需要实现4个函数:
    • beginTransaction - 在事务开始前,我们在目标文件系统的临时目录中创建一个临时文件。随后,我们可以在处理数据时将数据写入此文件。
    • preCommit - 在预提交阶段,我们刷新文件到存储,关闭文件,不再重新写入。我们还将为属于下一个checkpoint的任何后续文件写入启动一个新的事务。
    • commit - 在提交阶段,我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是,这会增加输出数据可见性的延迟。
    • abort - 在中止阶段,我们删除临时文件。

kafka只有一个partition,Exactly Once 和 At Least Once有没有区别

  • 如果只有一个partition,对应flink任务的Source Task并行度只能是1,确实没有区别,不会有至少一次的存在了,肯定是精确一次。因为只有barrier不对齐才会有可能重复处理,这里并行度都已经为1,默认就是对齐的,只有当上游有多个并行度的时候,多个并行度发到下游的barrier才需要对齐,单并行度不会出现barrier不对齐,所以必然精确一次。
  • barrier对齐就是Exactly Once不会重复消费,barrier不对齐就是 At Least Once可能重复消费,这里只有单个并行度根本不会存在barrier不对齐,所以不会存在至少一次语义

Checkpoint状态保留策略

  • DELETE_ON_CANCELLATION 表示当程序取消时,删除 Checkpoint 存储文件
  • RETAIN_ON_CANCELLATION 表示当程序取消时,保存之前的 Checkpoint 存储文件

Checkpoint潜在问题及解决方案

  • 每次进行Checkpoint前,都需要暂停处理新流入数据,然后开始执行快照,假如状态比较大,一次快照可能长达几秒甚至几分钟
    • Flink提供了异步快照(Asynchronous Snapshot)的机制。当实际执行快照时,Flink可以立即向下广播Checkpoint Barrier,表示自己已经执行完自己部分的快照。同时,Flink启动一个后台线程,它创建本地状态的一份拷贝,这个线程用来将本地状态的拷贝同步到State Backend上,一旦数据同步完成,再给Checkpoint Coordinator发送确认信息。拷贝一份数据肯定占用更多内存,这时可以利用写入时复制(Copy-on-Write)的优化策略。
  • Checkpoint Barrier对齐时,必须等待所有上游通道都处理完,假如某个上游通道处理很慢,这可能造成整个数据流堵塞
    • Flink允许跳过对齐这一步,或者说一个算子子任务不需要等待所有上游通道的Checkpoint Barrier,直接将Checkpoint Barrier广播,执行快照并继续处理后续流入数据。为了保证数据一致性,Flink必须将那些较慢的数据流中的元素也一起快照,一旦重启,这些元素会被重新处理一遍。

Checkpoint失败原因分析

  • Checkpoint Decline:当前 Flink 中如果较小的 Checkpoint 还没有对齐的情况下,收到了更大的 Checkpoint,则会把较小的 Checkpoint 给取消掉
  • Checkpoint Expire:如果 Checkpoint 做的非常慢,超过了 timeout 还没有完成,则整个 Checkpoint 会失败

Checkpoint慢的原因分析

  • Source Trigger Checkpoint 慢
    • 这个一般发生较少,但是也有可能,因为 source 做 snapshot 并往下游发送 barrier 的时候,需要抢锁(这个现在社区正在进行用 mailBox 的方式替代当前抢锁的方式。如果一直抢不到锁的话,则可能导致 Checkpoint 一直得不到机会进行。
  • Barrier 对齐慢
    • Checkpoint在task 端分为barrier对齐(收齐所有上游发送过来的 barrier),然后开始同步阶段,再做异步阶段。如果 barrier 一直对不齐的话,就不会开始做 snapshot
  • 同步阶段做的慢
    • 同步阶段一般不会太慢
    • 对于非 RocksDBBackend 我们可以考虑查看是否开启了异步 snapshot,如果开启了异步 snapshot 还是慢,需要看整个 JVM 在干嘛,也可以使用前一节中的工具
    • 对于 RocksDBBackend 来说,我们可以用 iostate 查看磁盘的压力如何,另外可以查看 tm 端 RocksDB 的 log 的日志如何,查看其中 SNAPSHOT 的时间总共开销多少
  • 异步阶段做的慢
    • 对于异步阶段来说,tm 端主要将 state 备份到持久化存储上
    • 对于非 RocksDBBackend 来说,主要瓶颈来自于网络,这个阶段可以考虑观察网络的 metric,或者对应机器上能够观察到网络流量的情况(比如 iftop)
    • 对于 RocksDB 来说,则需要从本地读取文件,写入到远程的持久化存储上,所以不仅需要考虑网络的瓶颈,还需要考虑本地磁盘的性能。另外对于 RocksDBBackend 来说,如果觉得网络流量不是瓶颈,但是上传比较慢的话,还可以尝试考虑开启多线程上传功能
  • 使用全量Checkpoint
    • 全量 Checkpoint 会把当前的 state 全部备份一次到持久化存储
    • 增量 Checkpoint,则只备份上一次 Checkpoint 中不存在的 state,因此增量 Checkpoint 每次上传的内容会相对更好,在速度上会有更大的优势
    • 仅在 RocksDBStateBackend 中支持增量 Checkpoint,如果你已经使用 RocksDBStateBackend,可以通过开启增量 Checkpoint来提升checkpoint的速度
  • 作业存在反压或者数据倾斜
    •  task 仅在接受到所有的 barrier 之后才会进行 snapshot,如果作业存在反压,或者有数据倾斜,则会导致全部的 channel 或者某些 channel 的 barrier 发送慢,从而整体影响 Checkpoint 的时间
  • 主线程太忙,导致没机会做 snapshot
    • 在 task 端,所有的处理都是单线程的,数据处理和 barrier 处理都由主线程处理,如果主线程在处理太慢(比如使用 RocksDBBackend,state 操作慢导致整体处理慢),导致 barrier 处理的慢,也会影响整体 Checkpoint 的进度

参考

https://www.toutiao.com/a6737459892816183811/

https://blog.csdn.net/u013411339/article/details/90625521

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/816214
推荐阅读
相关标签
  

闽ICP备14008679号