当前位置:   article > 正文

flink调优

flink调优

一、 Checkpoint 设置
1、Checkpoint 间隔不要太短

虽然理论上 Flink 支持很短的 checkpoint 间隔,但是在实际生产中,过短的间隔对于底层分布式文件系统而言,会带来很大的压力。另一方面,由于检查点的语义,所以实际上 Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的 checkpoint,可能会影响整体的性能。当然,这个建议的出发点是底层分布式文件系统的压力考虑。

2、 合理设置超时时间

默认的超时时间是 10min,如果 state 规模大,则需要合理配置。最坏情况是分布式地创建速度大于单点(job master 端)的删除速度,导致整体存储集群可用空间压力较大。建议当检查点频繁因为超时而失败时,增大超时时间。
二、资源配置

1、并行度(parallelism):保证足够的并行度,并行度也不是越大越好,太多会加重数据在多个solt/task manager之间数据传输压力,包括序列化和反序列化带来的压力。

2、CPU:CPU资源是task manager上的solt共享的,注意监控CPU的使用。

3、内存:内存是分solt隔离使用的,注意存储大state的时候,内存要足够。

4、网络:大数据处理,flink节点之间数据传输会很多,服务器网卡尽量使用万兆网卡。
三、Flink 异步 IO
四、Operator Chain

为了更高效地分布式执行,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task,每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。

Flink 会在生成 JobGraph 阶段,将代码中可以优化的算子优化成一个算子链(Operator Chains)以放到一个 task(一个线程)中执行,以减少线程之间的切换和缓冲的开销,提高整体的吞吐量和延迟。下面以官网中的例子进行说明。
在这里插入图片描述
上图中,source、map、[keyBy|window|apply]、sink 算子的并行度分别是 2、2、2、1,经过 Flink 优化后,source 和 map 算子组成一个算子链,作为一个 task 运行在一个线程上,其简图如图中 condensed view 所示,并行图如 parallelized view 所示。算子之间是否可以组成一个Operator Chains,看是否满足以下条件:

  • 上下游算子的并行度一致;
  • 上下游节点都在同一个slot group 中;
  • 下游节点的chain策略为ALWAYS;
  • 上游节点的chain策略为ALWAYS或HEAD;
  • 两个节点间数据分区方式是forward;
  • 用户没有禁用chain。

五、Slot Sharing

Slot Sharing 是指,来自同一个 Job 且拥有相同 slotSharingGroup(默认:default)名称的不同 Task 的 SubTask 之间可以共享一个 Slot,这使得一个 Slot 有机会持有 Job 的一整条 Pipeline

六、任务消息处理能力视角

对于 Flink 任务消息处理能力分析,我们主要是看实时任务消费的数据源单位时间的输入,和实时任务各个 Operator / Task 消息处理能力是否匹配。Operator 是 Flink 任务的一个算子,Task 则是一个或者多个算子 Chain 起来后,一起执行的物理载体。

数据源我们内部一般使用 Kafka,Kafka Topic 的单位时间输入可以通过调用 Kafka Broker JMX 指标接口进行获取,当然你也可以调用 Flink Rest Monitoring 相关 API 获取实时任务所有 Kafka Source Task 单位时间输入,然后相加即可。不过由于反压可能会对 Source 端的输入有影响,这里我们是直接使用 Kafka Broker 指标 JMX 接口获取 Kafka Topic 单位时间输入。

在获取到实时任务 Kafka Topic 单位时间输入后,下面就是判断实时任务的消息处理能力是否与数据源输入匹配。一个实时任务整体的消息处理能力,会受到处理最慢的 Operator / Task 的影响。打个比方,Flink 任务消费的 Kafka Topic 输入为 20000 Record / S,但是有一个 Map 算子,其并发度为 10 ,Map 算子中业务方调用了 Dubbo,一个 Dubbo 接口从请求到返回为 10 ms,那么 Map 算子处理能力 1000 Record / S (1000 ms / 10 ms * 10 ),从而实时任务处理能力会下降为 1000 Record / S。

由于一条消息记录的处理会在一个 Task 内部流转,所以我们试图找出一个实时任务中,处理最慢的 Task 逻辑。如果 Source 端到 Sink 端全部 Chain 起来的话,我们则是会找出处理最慢的 Operator 的逻辑。在源码层,我们针对 Flink Task 以及 Operator 增加了单条记录处理时间的自定义 Metric,之后该 Metric 可以通过 Flink Rest API 获取。我们会遍历一个 Flink 任务中所有的 Task , 查询处理最慢的 Task 所在的 JobVertex(JobGraph 的点),然后获取到该 JobVertex 所有 Task 的总输出,最终会和 Kafka Topic 单位时间输入进行比对,判断实时任务消息处理能力是否合理。

设实时任务 Kafka Topic 单位时间的

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/942521
推荐阅读
相关标签
  

闽ICP备14008679号