当前位置:   article > 正文

Flink 1. 13(六)容错机制_flink uid

flink uid

一.检查点

1.概述

发生故障之后,最简单的想法当然是重启机器、重启应用。由于是分布式的集群,即使一个节点无法恢复,也不会影响应用的重启执行。这里的问题在于,流处理应用中的任务都是有状态的,而为了快速访问这些状态一般会直接放在堆内存里;现在重启应用,内存中的状态已经丢失,就意味着之前的计算全部白费了,需要从头来过。就像编写文档或是玩 RPG游戏,因为宕机没保存而要重来一遍是一件令人崩溃的事情;这种惨痛的经历让我们养成了一个好习惯——随时存档,这样即使遇到宕机也可以读档继续了

流处理中,我们同样可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。进一步地,我们知道在有状态的流处理中,任务继续处理新数据,并不需要“之前的计算结果”,而是需要任务“之前的状态”。所以我们最终的选择,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”

遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。检查点是 Flink 容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把 checkpoint 叫作“一致性检查点”

2.检查点的保存

“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以更好的方式是,每隔一段时间去做一次存档,这样既不会影响数据的正常处理,也不会有太大的延迟——毕竟故障恢复的情况不是随时发生的

在 Flink 中,检查点的保存是周期性触发的,间隔时间可以进行设置

所以检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点

当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来

在这里插入图片描述

例如上图中,已经处理了 3 条数据:“hello”“world”“hello”,所以我们会看到 Source 算子的偏移量为 3;后面的 Sum 算子处理完第三条数据“hello”之后,此时已经有 2 个“hello”和 1 个“world”,所以对应的状态为“hello”-> 2,“world”-> 1(这里 KeyedState底层会以 key-value 形式存储)。此时所有任务都已经处理完了前三个数据,所以我们可以把当前的状态保存成一个检查点,写入外部存储中。至于具体保存到哪里,这是由状态后端的配置项 “ 检查点存储 ”( CheckpointStorage )来决定的

3.从检查点恢复状态

例如在上节的 word count 示例中,我们处理完三个数据后保存了一个检查点。之后继续运行,又正常处理了一个数据“flink”,在处理第五个数据“hello”时发生了故障

在这里插入图片描述
这里 Source 任务已经处理完毕,所以偏移量为 5;Map 任务也处理完成了。而 Sum 任务在处理中发生了故障,此时状态并未保存

接下来就需要从检查点来恢复状态了。具体的步骤为:

(1)重启应用
遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空

在这里插入图片描述

(2)读取检查点,重置状态
找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候,这里 key 为“flink”并没有数据到来,所以初始为0
在这里插入图片描述

(3)重放数据
从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第 4、5 个数据(“flink”“hello”)就相当于丢掉了;这会造成计算结果的错误

为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现

在这里插入图片描述
这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻

(4)继续处理数据
接下来,我们就可以正常处理数据了。首先是重放第 4、5 个数据,然后继续读取后面的数据

在这里插入图片描述

当处理到第 5 个数据时,就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样;我们既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性

4.检查点算法

前面我们讲到,检查点保存时,某一个数据都被处理完了,问题就是每个算子的进度不同,假如保存前3个数据时,source算子只能等待后面的算子处理完成后才能继续工作,很明显降低了效率

检查点分界线

我们现在的目标是,在不暂停流处理的前提下,让每个任务“认出”触发检查点保存的那个数据

所以我们可以借鉴水位线(watermark)的设计,在数据流中插入一个特殊的数据结构,叫做检查点的分界线,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source 任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存,检查点分界线中带有一个检查点 ID,这是当前要保存的检查点的唯一标识

理解这句话很重要,分界线就将一条流逻辑上分成了两部分:分界线之前的数据导致的状态更改,都会被包含在当前分界线所表示的检查点中;而基于分界线之后的数据导致的状态更改,则会被包含在之后的检查点中(也就是下一个检查点)

下图中,为什么source保存的是3呢?因为在barrier之前有3个数据已经出去了,hello有一个已经被处理完成了,还有一个hello在执行map,另外一个world在执行sum,这也就是barrier为什么叫分界线,分的是barrier前后两部分,保存的检查点内容也只是barrier前面的

在这里插入图片描述

每个算子任务只要处理到这个 barrier,就把当前的状态进行快照;在收到 barrier 之前,还是正常地处理之前的数据,完全不受影响。比如上图中,Source 任务收到 1 号检查点保存指令时,读取完了三个数据,所以将偏移量 3 保存到外部存储中;而后将 ID 为 1 的 barrier 注入数据流;与此同时,Map 任务刚刚收到上一条数据“hello”,而 Sum 任务则还在处理之前的第二条数据(world, 1)。下游任务不会在这时就立刻保存状态,而是等收到 barrier 时才去做快照,这时可以保证前三个数据都已经处理完了。同样地,下游任务做状态快照时,也不会影响上游任务的处理,每个任务的快照保存并行不悖,不会有暂停等待的时间

分布式快照

Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的 barrier 都到齐,才可以开始状态的保存

Flink的数据流任务会等待所有上游任务的barrier都到达,并等待所有的分区都到达相应的barrier之后,才会开始处理数据
在这里插入图片描述

完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当 JobManager 收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了

由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Flink 1.11 之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区 barrier 时就不需等待对齐,而是可以直接启动状态的保存了

5.检查点配置

检查点开启

默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔 1 秒启动一次检查点保存
env.enableCheckpointing(1000);
  • 1
  • 2
  • 3

检查点存储

检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage,CheckpointStorage 描述了 Checkpoint 行为,定义 Checkpoint 的存储位置和方式以进行故障恢复,与Flink1.12差距很大

具体可以通过调用检查点配置的 .setCheckpointStorage() 来 配 置 , 需要传入一个CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)

// 配置存储检查点到 JobManager 堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
  • 1
  • 2
  • 3
  • 4

其他高级配置

检查点还有很多可以配置的选项,可以通过获取检查点配置(CheckpointConfig)来进行设置

CheckpointConfig checkpointConfig = env.getCheckpointConfig();
  • 1

我们这里做一个简单的列举说明:

(1)检查点模式(CheckpointingMode)
设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为 exactly-once,而对于大多数低延迟的流处理程序,at-least-once 就够用了

(2)超时时间(checkpointTimeout)

用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间

(3)最小间隔时间(minPauseBetweenCheckpoints)

用于指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时maxConcurrentCheckpoints 的值强制为 1

(4)最大并发检查点数量(maxConcurrentCheckpoints)

用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。如果前面设置了 minPauseBetweenCheckpoints,则 maxConcurrentCheckpoints 这个参数就不起作用了

(5)开启外部持久化存储(enableExternalizedCheckpoints)

用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数 ExternalizedCheckpointCleanup 指定了当作业取消的时候外部的检查点该如何清理

  • DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点
  • RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点

(6)检查点异常时是否让整个任务失败(failOnCheckpointingErrors)

用于指定在检查点发生异常的时候,是否应该让任务直接失败退出。默认为 true,如果设置为 false,则任务会丢弃掉检查点然后继续运行

(7)不对齐检查点(enableUnalignedCheckpoints)

不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为 exctly-once,并且并发的检查点个数为 1

代码中具体设置如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点,间隔时间 1 秒
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔时间 500 毫秒
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 超时时间 1 分钟
checkpointConfig.setCheckpointTimeout(60000);
// 同时只能有一个检查点
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 开启检查点的外部持久化保存,作业取消后依然保留
checkpointConfig.enableExternalizedCheckpoints(
 ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();
// 设置检查点存储,可以直接传入一个 String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

二.保存点

从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜像(consistent image)的

保存点中的状态快照,是以算子 ID 和状态名称组织起来的,相当于一个键值对。从保存点启动应用程序时,Flink 会将保存点的状态数据重新分配给相应的算子任务

保存点的用途

保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复

保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启

需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子 ID-状态名称这样的 key-value 组织起来的,算子ID 可以在代码中直接调用SingleOutputStreamOperator 的.uid()方法来进行指定:

DataStream<String> stream = env
 .addSource(new StatefulSource())
 .uid("source-id")
 .map(new StatefulMapper())
 .uid("mapper-id")
 .print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

对于没有设置 ID 的算子,Flink 默认会自动进行设置,所以在重新启动应用后可能会导致ID 不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定 ID

使用保存点

保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业

(1)创建保存点
要在命令行中为运行的作业创建一个保存点镜像,只需要执行:

bin/flink savepoint :jobId [:targetDirectory]
  • 1

这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点存储的路径对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定:

state.savepoints.dir: hdfs:///flink/savepoints
  • 1

当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:

env.setDefaultSavepointDir("hdfs:///flink/savepoints");
  • 1

由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:

bin/flink stop --savepointPath [:targetDirectory] :jobId
  • 1

从保存点重启应用

我们已经知道,提交启动一个 Flink 作业,使用的命令是 flink run;现在要从保存点重启一个应用,其实本质是一样的:

bin/flink run -s :savepointPath [:runArgs]
  • 1

这里只要增加一个-s 参数,指定保存点的路径就可以了,其他启动时的参数还是完全一样的。细心的读者可能还记得我们在第三章使用 web UI 进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置

三.状态一致性

1.概述

简单来讲,一致性其实就是结果的正确性。对于分布式系统而言,强调的是不同节点中相同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值;而对于事务而言,是要求提交更新操作后,能够读取到新的数据。对于 Flink 来说,多个节点并行处理不同的任务,我们要保证计算结果是正确的,就必须不漏掉任何一个数据,而且也不会重复处理同一个数据。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”

一般说来,状态一致性有三种级别:

  • 最多一次(AT-MOST-ONCE)

当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态,也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是“最多处理一次”。我们发现,如果数据可以直接被丢掉,那其实就是没有任何操作来保证结果的准确性;所以这种类型的保证也叫“没有保证”。尽管看起来比较糟糕,不过如果我们的主要诉求是“快”,而对近似正确的结果也能接受,那这也不失为一种很好的解决方案

  • 至少一次(AT-LEAST-ONCE)

在实际应用中,我们一般会希望至少不要丢掉数据。这种一致性级别就叫作“至少一次”(at-least-once),就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,数据源重放,有些数据会被重复处理

  • 精确一次(EXACTLY-ONCE)

最严格的一致性保证,就是所谓的“精确一次”(exactly-once,有时也译作“恰好一次”)。这也是最难实现的状态一致性语义。exactly-once 意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一次统计。exactly-once 可以真正意义上保证结果的绝对正确,在发生故障恢复后,就好像从未发生过故障一样

很明显,要做的 exactly-once,首先必须能达到 at-least-once 的要求,就是数据不丢。所以同样需要有数据重放机制来保证这一点。另外,还需要有专门的设计保证每个数据只被处理一次。Flink 中使用的是一种轻量级快照机制——检查点(checkpoint)来保证 exactly-once 语义

2.端到端精确一次

实际应用中,最难做到、也最希望做到的一致性语义,无疑就是端到端(end-to-end)的“精确一次”(exactly-once)。我们知道,对于 Flink 内部来说,检查点机制可以保证故障恢复后数据不丢(在能够重放的前提下),并且只处理一次,所以已经可以做到 exactly-once 的一致性语义了

需要注意的是,我们说检查点能够保证故障恢复后数据只处理一次,并不是说之前统计过某个数据,现在就不能再次统计了;而是要看状态的改变和输出的结果,是否只包含了一次这个数据的处理。由于检查点保存的是之前所有任务处理完某个数据后的状态快照,所以重放的数据引起的状态改变一定不会包含在里面,最终结果中只处理了一次

所以,端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端

端到端精确一次保证

  • 内部保证 - checkpoint
  • source端 - 可重设数据的读取位置
  • sink端 - 从故障恢复时,数据不会重复写入外部系统
    • 幂等写入
    • 事务写入

1.幂等(idempotent)写入

所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了

这相当于说,我们并没有真正解决数据重复计算、写入的问题;而是说,重复写入也没关系,结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入:比如 Redis 中键值存储,或者关系型数据库(如 MySQL)中满足查询条件的更新操作。需要注意,对于幂等写入,遇到故障进行恢复时,有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据,其实已经写入了一遍,回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据,可能会看到奇怪的现象:短时间内,结果会突然“跳回”到之前的某个值,然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候,最终的结果还是一致

关于跳回的例子,比如redis的某个K-V键值对,正常修改是K- 1、K - 2、K - 3…,如果检查点保存的刚好到K - 8,这时候K - 11出现了故障,肯定要回滚,但是redis依旧是K -11,所以这个K- V又要经历一次 K - 9、K -10 、K -11

2. 事务(transactional)写入

如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方式

之前我们提到,输出端最大的问题就是“覆水难收”,写入到外部系统的数据难以撤回

自然想到,那怎样可以收回一条已写入的数据呢?利用事务就可以做到

我们都知道,事务(transaction)是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所做的所有更改都会被撤消。事务有四个基本特性:原子性(Atomicity)、一致性(Correspondence)、隔离性(Isolation)和持久性(Durability),这就是著名的 ACID

在 Flink 流处理的结果写入外部系统时,如果能够构建一个事务,让写入操作可以随着检查点来提交和回滚,那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是:用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当 Sink 任务遇到 barrier 时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了

在这里插入图片描述

具体来说,又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)

预写日志就是一种非常简单的方式。具体步骤是:

①先把结果数据作为日志(log)状态保存起来
②进行检查点保存时,也会将这些结果数据一并做持久化存储
③在收到检查点完成的通知时,将所有结果一次性写入外部系统

我们注意到,前两步操作已经将数持久化,但是如果第三步的检查点通知没有成功给Job Manger,那么Flink认为此次写入还是失败的,因此恢复到上一个检查点,显然,这会导致这批数据重复写入

两阶段提交(two-phase-commit,2PC)

前面提到的各种实现 exactly-once 的方式,多少都有点缺陷,有没有更好的方法呢?自然
是有的,这就是传说中的两阶段提交(2PC)

顾名思义,它的想法是分成两个阶段:先做“预提交”,等检查点完成之后再正式提交。这种提交方式是真正基于事务的,它需要外部系统提供事务支持

具体的实现步骤为:

①当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务
②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态,如果提交到kafka,kafka会将这些数据标记为未确认的数据
③当所有算子的快照都完成,也就是这次的检查点保存最终完成时,JobManager 会向所有任务发确认通知,告诉大家当前检查点已成功保存,当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了,例如kafka就将这些数据标记为已确认了

分界线的到来,就标志着开始一个新事务;而收到来自 JobManager 的 checkpoint 成功的消息,就是提交事务 的指令

因此,只有在source可重置的情况下,使用2PC才能实现精确一次,不过要注意,2PC对外部系统的要求较高

3.端到端kafka配置

在具体应用中,实现真正的端到端 exactly-once,还需要有一些额外的配置:

(1)必须启用检查点;
(2)在 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE;

 public static enum Semantic {
        EXACTLY_ONCE,
        AT_LEAST_ONCE,
        NONE;

        private Semantic() {
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

(3)配置 Kafka 读取数据的消费者的隔离级别

这里所说的 Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而 Kafka 中默认的隔离级别 isolation.level 是 read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置为 read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟

(4)事务超时配置

Flink 的 Kafka连接器中配置的事务超时时间 transaction.timeout.ms 默认是 1小时,而Kafka集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。所以在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而 Sink 任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者

四.Flink背压

什么是背压?

Flink背压(Backpressure)是指在数据处理流程中,当数据生产速度快于消费速度时,消费者对生产者施加一定的反压力以平衡数据流的速率

Flink内部对背压的处理

异步边界 Async Boundary:Flink引入了异步边界来划分数据流的边界。异步边界是指在数据流中的某个位置,将数据的生产和消费分开,形成一种异步的关系。背压机制只会在异步边界处引入,避免影响整个数据流的处理。

数据缓存:当上游算子收到背压信号时,Flink会将部分未处理的输入数据缓存在输入缓存中,等待下游算子的处理。这样可以避免数据的丢失,并提高数据处理的效率,Flink使用基于Credit的背压机制来实现上下游算子之间的数据协调。上游算子根据下游算子的Credit情况调整数据的产生速率,以避免下游的背压情况。这种机制能够动态地保持数据处理的平衡。

动态重分区(Dynamic Repartitioning):Flink支持动态重分区,即根据背压的情况,动态地增加或减少算子之间的分区数量。这样可以在背压发生时进行动态调整,以适应数据的处理能力。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/385190
推荐阅读
相关标签
  

闽ICP备14008679号