当前位置:   article > 正文

「Flink反压机制」看这一篇就够了

flink反压

1 网络流量控制概念与背景

背景:较高的发送速度和较低的接收能力的不匹配,会造成传输出错。

1.1 为什么要进行流量控制

假设Producer和Consumer的吞吐率分别为2MB/s、1MB/s,生产者速度比消费者快1MB/s。假设两端都有一个Buffer,网络端的吞吐率是2MB/s。过了5s后Receive Buffer可能就撑不住了,这时候会面临两种情况:

  • 如果 Receive Buffer 是有界的,这时候新到达的数据就只能被丢弃掉了。
  • 如果 Receive Buffer 是无界的,Receive Buffer 会持续的扩张,最终会导致 Consumer 的内存耗尽。

1.2 网络流量控制实现-静态限速

传统的做法是在发送端实现一个类似Rate Limiter这样的静态限流,发送端经过限流层之后以1MB/s的吞吐率发往接收端,这样就解决了Producer和Consumer处理速率不一致的情况;

方案存在的缺点:

  • 事先无法预估 Consumer 到底能承受多大的速率
  • Consumer 的承受能力通常会动态地波动
    在这里插入图片描述

1.3 网络流量控制实现-动态反馈

动态反馈需要 Consumer 能够及时的给 Producer 进行反馈,告知 Producer 当前能够承受的速率是多少。动态反馈分为两种:

  • 负反馈:接收速率 < 发送速率时,告知 Producer 降低发送速率;
  • 正反馈:发送速率 > 接收速率时,告知 Producer 提高发送速率。

1.4 Flink网络传输架构

Flink 在做网络传输的时候基本的数据的流向
在这里插入图片描述
发送端发送数据前会经历自己内部的三层缓冲,(Flink)Network Buffer -> (Netty)ChannelOutbound Buffer -> (Socket)Send Buffer;同样在接收端也会有三层Buffer。

最终数据是经过TCP发送,TCP有流量控制机制,实际上 Flink (before V1.5)就是通过 TCP 的流控机制来实现 feedback 的。

TCP流量控制
TCP可靠传输机制

2 TCP流量控制

TCP报文格式

TCP 包的格式结构
1)Sequence number机制:用来给每个数据包做一个编号
2)ACK number机制:确保TCP数据传输的可靠性
3)Window Size机制:接收端在回复消息的时候会通过 Window Size 告诉发送端还可以发送多少数据。
在这里插入图片描述

2.1 TCP流量控制-滑动窗口

在这里插入图片描述
TCP基于滑动窗口实现流量控制。

现在有一个Socket 的发送端和一个 Socket 的接收端,发送端的速率是我们接收端的 3 倍,假设发送端初始窗口大小为3,接收端窗口大小为5,接收端的 Buffer 大小为 5。

在这里插入图片描述
(1)首先,发送端会一次性发 3 个 packets,将 1,2,3 发送给接收端,接收端接收到后会将这 3 个 packets 放到 Buffer 里去。

在这里插入图片描述

(2)当1被接收端消费后,接收端的滑动窗口往前滑动一步,这时2 3还在Buffer中,而4 5 6是空出来的,接收端将 ACK=4 Window = 3(5-2) 发送给发送端,代表发送端可以从4开始发送,发送端接收到响应后,会将滑动窗口往前滑动三步。

在这里插入图片描述
(3)发送端发送4 5 6,接收端目前buffer还有3个位置,能够成功接收 4 5 6。

在这里插入图片描述
(4)接收端消费掉2,同时接收端的滑动窗口向前移动一步,buffer剩下1个,于是向发送端发送ack = 7,window = 1。发送端接收到反馈后,虽然发送端的窗口大小为3,但是接收端只能接收1个,所以发送端滑动窗口向前移动一步到7,以此来达到限流的效果,发送端速度从3降到1

在这里插入图片描述
(5)我们再看一下这种情况,这时候发送端将 7 发送后,接收端接收到,但是由于接收端的消费出现问题,一直没有从 Buffer 中去取,这时候接收端向发送端发送 ACK = 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。这个时候发送端不发送任何数据了,接收端也不进行任何的反馈了,那么如何知道消费端又开始消费了呢?

  • TCP 当中有一个 ZeroWindowProbe 的机制,发送端会定期的发送 1 个字节的探测消息,这时候接收端就会把 window 的大小进行反馈。当接收端的消费恢复了之后,接收到探测消息就可以将 window 反馈给发送端端了从而恢复整个流程。TCP 就是通过这样一个滑动窗口的机制实现 feedback

3 Flink TCP-based反压机制(1.5版本之前)

3.1 任务执行流程

参考文档
在这里插入图片描述

1)编写代码

根据用户编写的代码会生成最初的DAG图,用来表示程序的拓扑结构,即逻辑流图(streamGraph)

2)编译阶段,生成 作业图(JobGraph)

客户端会将StreamGraph生成JobGraph,主要是做一些算子链合并,JobGraph 就是做为向集群提交的最基本的单元。有了 JobGraph 后就会向集群进行提交,进入运行阶段。

3)运行阶段

JobGraph 提交到集群后会生成 执行图(ExecutionGraph) ,ExecutionGraph是 JobGraph 的并行化版本,是调度层最核心的数据结构。与 JobGraph 最大的区别就是按照并行度对并行子任务进行了拆分,分解为不同的subTask,并明确了任务间数据传输的方式。

在这里插入图片描述

上图 ExecutionGraph 中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 ExecutionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。

每个Task接收数据时,经过的流程:

ResultPartition 负责发送数据,中间的 InputGate 负责接收数据,发送到task之后,在下游的 ResultPartition 对数据做重分区,就形成了 ResultSubPartition 和 InputChannel 的对应关系。
这就是从逻辑层上来看的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。

3.2 问题拆解-反压传播两个阶段

在这里插入图片描述
反压的传播实际上是分为两个阶段的,对应着上面的执行图,我们一共涉及 3 个 TaskManager,在每个 TaskManager 里面都有相应的 Task 在执行,还有负责接收数据的 InputGate发送数据的 ResultPartition,这就是一个最基本的数据传输的通道。在这时候假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候是如何将这个压力反向传播回去呢?这时候就分为两种情况:

  • 跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition。
  • TaskManager 内,反压如何从 ResultPartition 传播到 InputGate。

3.3 跨TaskManager数据传输

在这里插入图片描述
发送数据需要 ResultPartition,在每个 ResultPartition 里面会有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。每一个 TaskManager 会有一个统一的网络缓冲池(Network BufferPool),被所有Task共享,在初始化时会从堆外内存( Off-heap Memory )中申请内存,申请内存后就可以为每一个 ResultSubPartition 创建 本地缓冲池(Local BufferPool)

如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,因为 ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存于是将请求转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就可以被写入了。

之后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将消息发送出去。然后接收端按照类似的机制去处理将消息消费掉

3.4 跨 TaskManager 反压过程

当发送端速率为2,接收端速率为1,就会出现反压。
在这里插入图片描述
(1)因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他会向 Local BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的一个 Buffer 就会被标记为 Used。

在这里插入图片描述
(2)发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向 Local BufferPool 申请 Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,当然每个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 Network BufferPool 耗尽。这时候看到 Network BufferPool 还是有可用的 Buffer 可以向其申请。

在这里插入图片描述
(3)一段时间后,发现 Network BufferPool 没有可用的 Buffer,或是 Local BufferPool 的最大可用 Buffer 到了上限无法向 Network BufferPool 申请,没有办法去读取新的数据,这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer 中读取数据了。

在这里插入图片描述

(4)显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的 TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。

在这里插入图片描述
(5)很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后就会停止向 Socket 写数据。

在这里插入图片描述

(6)Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测 Netty 是否可写,发现不可写就会停止向 Netty 写数据。

在这里插入图片描述
(7)这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool 和 Network BufferPool 申请内存。

在这里插入图片描述

3.5 TaskManager内反压过程

了解了跨 TaskManager 反压过程后再来看 TaskManager 内反压过程就更好理解了,下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是 Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

4 Flink Credit-based 反压机制(1.5版本之后)

4.1 TCP反压的弊端

1)单点问题:在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。

2)依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。

4.2 引入 Credit-based 反压

这个机制简单的理解起来就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制(向接收端发送当前还能接收数据的最大限制)。

4.3 Credit-based 反压过程

在这里插入图片描述

在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信),下面我们看一个具体示例。

在这里插入图片描述

假设我们上下游的速度不匹配,上游发送速率为 2,下游接收速率为 1,可以看到图上在 ResultSubPartition 中累积了两条消息,10 和 11, backlog 就为 2,这时就会将发送的数据 <8,9> 和 backlog = 2 一同发送给下游。下游收到了之后就会去计算是否有 2 个 Buffer 去接收,可以看到 InputChannel 中已经不足了这时就会从 Local BufferPool 和 Network BufferPool 申请,好在这个时候 Buffer 还是可以申请到的。

在这里插入图片描述
过了一段时间后由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题

5 总结与思考

5.1 总结

1)网络流控是为了在上下游速度不匹配的情况下,防止下游出现过载。

2)网络流控有静态限速和动态反压两种手段。

3)Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压。

4)Flink 1.5 之后实现了自己托管的 credit - based 流控机制,在应用层模拟 TCP 的流控机制。

5.2 思考

有了动态反压,静态限速是不是完全没有作用了?

在这里插入图片描述
实际上动态反压不是万能的,我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端,这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流

所以说动态反压并不能完全替代静态限速的,需要根据合适的场景去选择处理方案。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/548479
推荐阅读
相关标签
  

闽ICP备14008679号