赞
踩
Flink 性能调优的第一步,就是为任务分配合适的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。
提交方式主要是 yarn-per-job,资源的分配在使用脚本提交 Flink 任务时进行指定。
生产资源配置
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \ 指定并行度
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=2048mb \
-Dtaskmanager.memory.process.size=6144mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.app.dwd.LogBaseApp \
/opt/module/gmall-flink/gmall-realtime-1.0-SNAPSHOT-jar-with-dependencies.jar
Flink 是实时流处理,关键在于资源情况能不能抗住高峰时期每秒的数据量,通常用QPS/TPS 来描述数据情况。
最优并行度计算: 总 QPS/单并行度的处理能力 = 并行度
最好根据高峰期的 QPS 压测,并行度*1.2 倍,富余一些资源。
数据源端是 Kafka,Source 的并行度设置为 Kafka 对应 Topic 的分区数。
如果已经等于 Kafka 的分区数,消费速度仍跟不上数据生产速度。考虑下 Kafka 要扩大分区,同时调大并行度等于分区数。
- Keyby 之前的算子
一般不会做太重的操作,都是比如 map、filter、flatmap 等处理较快的算子,并行度可以和 source 保持一致。- Keyby 之后的算子
如果并发较大,建议设置并行度为 2 的整数次幂,例如:128、256、512;
小并发任务的并行度不一定需要设置成 2 的整数次幂;
大并发任务如果没有 KeyBy,并行度也无需设置为 2 的整数次幂;
Sink 端是数据流向下游的地方,可以根据 Sink 端的数据量及下游的服务抗压能力进行评估。
如果 Sink 端是 Kafka,可以设为 Kafka 对应 Topic 的分区数。
Sink 端的数据量小,比较常见的就是监控告警的场景,并行度可以设置的小一些。
Source 端的数据量是最小的,拿到 Source 端流过来的数据后做了细粒度的拆分,数据量不断的增加,到 Sink 端的数据量就非常大。那么在 Sink 到下游的存储中间件的时候就需要提高并行度。
另外 Sink 端要与下游的服务进行交互,并行度还得根据下游的服务抗压能力来设置,
如果在 Flink Sink 这端的数据量过大的话,且 Sink 处并行度也设置的很大,但下游的服务完全撑不住这么大的并发写入,可能会造成下游服务直接被写挂,所以最终还是要在 Sink 处的并行度做一定的权衡。
RocksDB 是基于 LSM Tree 实现的(类似 HBase),写数据都是先缓存到内存中,所以 RocksDB 的写请求效率比较高。RocksDB 使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中 blockcache 中查找,如果内存中没有再去磁盘中查询。优化后差不多单并行度 TPS 5000 record/s,性能瓶颈主要在于 RocksDB 对磁盘的读请求,所以当处理性能不够时,仅需要横向扩展并行度即可提高整个 Job 的吞吐量。以下几个调优参数:
设置本地 RocksDB 多目录
在 flink-conf.yaml 中配置:state.backend.rocksdb.localdir
: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb
注意:不要配置单块磁盘的多个目录,务必将目录配置到多块不同的磁盘上,让多块磁盘来分担压力
state.backend.incremental:开启增量检查点,默认 false,改为 true。
state.backend.rocksdb.predefined-options:
SPINNING_DISK_OPTIMIZED_HIGH_MEM 设置为机械硬盘+内存模式,有条件上SSD,指定为 FLASH_SSD_OPTIMIZED
state.backend.rocksdb.block.cache-size: 整 个 RocksDB 共 享 一 个 block cache,读数据时内存的 cache 大小,该参数越大读数据时缓存命中率越高,默认大小为 8 MB,建议设置到 64 ~ 256 MB。
state.backend.rocksdb.thread.num: 用于后台 flush 和合并 sst 文件的线程数,默认为 1,建议调大,机械硬盘用户可以改为 4 等更大的值。
state.backend.rocksdb.writebuffer.size: RocksDB 中,每个 State 使用一个Column Family,每个 Column Family 使用独占的 write buffer,建议调大,例如:32M
state.backend.rocksdb.writebuffer.count: 每 个 Column Family 对 应 的writebuffer 数目,默认值是 2,对于机械磁盘来说,如果内存⾜够大,可以调大到 5 左右
state.backend.rocksdb.writebuffer.number-to-merge: 将数据从 writebuffer中 flush 到磁盘时,需要合并的 writebuffer 数量,默认值为 1,可以调成 3。
. state.backend.local-recovery: 设置本地恢复,当 Flink 任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从 hdfs 拉取数据
一般我们的 Checkpoint 时间间隔可以设置为分钟级别,例如 1 分钟、3 分钟,对于状态很大的任务每次 Checkpoint 访问 HDFS 比较耗时,可以设置为 5~10 分钟一次Checkpoint,并且调大两次 Checkpoint 之间的暂停间隔,例如设置两次 Checkpoint 之间至少暂停 4 或 8 分钟。
如果 Checkpoint 语义配置为 EXACTLY_ONCE,那么在 Checkpoint 过程中还会存在 barrier 对齐的过程,可以通过 Flink Web UI 的 Checkpoint 选项卡来查看Checkpoint 过程中各阶段的耗时情况,从而确定到底是哪个阶段导致 Checkpoint 时间过长然后针对性的解决问题。
// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 Checkpoint
RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("hdfs://hadoop102:8020/flink/checkpoints", true);
env.setStateBackend(rocksDBStateBackend);
// 开启 Checkpoint,间隔为 3 分钟
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(3));
// 配置 Checkpoint
CheckpointConfig checkpointConf = env.getCheckpointConfig();
checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 最小间隔 4 分钟
checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(4))
// 超时时间 10 分钟
checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
// 保存 checkpoint
checkpointConf.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Flink 中可以通过使用 ParameterTool 类读取配置,它可以读取环境变量、运行参数、配置文件。
ParameterTool 是可序列化的,可以将它当作参数进行传递给算子的自定义函数类。
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String myJobname = parameterTool.get("jobname"); //参数名对应
ParameterTool parameterTool = ParameterTool.fromSystemProperties();
// 读取系统属性
System.out.println(parameterTool.toMap().toString());
// 读取配置文件
ParameterTool.fromPropertiesFile(“/application.properties”);
// 注册全局参数
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
env.execute(myJobname);
先在 kafka 中积压数据,之后开启 Flink 任务,出现反压,就是处理瓶颈。相当于水库先积水,一下子泄洪。数据可以是自己造的模拟数据,也可以是生产中的部分数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。