赞
踩
提示:实时flink sql 参考很多网上方法与自己实践方法汇总(版本:flink1.13+)
//关闭详细算子链(默认为true),true后job性能会略微有提升。false则可以展示更详细的DAG图方便地位性能结点 ###有用的参数 pipeline.operator-chaining: 'true' //指定时区 ###实用的参数 table.local-time-zone: Asia/Shanghai //对flink sql是否要敏感大小(建议false,不区分大小写。默认为true) table.identifier-case-sensitive: 'false' //开启 miniBatch table.exec.mini-batch.enabled: 'true' //批量输出的间隔时间 table.exec.mini-batch.allow-latency: 5s //防止 OOM 设置每个批次最多缓存数据的条数 table.exec.mini-batch.size: '500' //提交批次数据大小 batchSize: '127108864' //刷数据间隔 flushIntervalMs: '60000' //几个flush线程 numFlushThreads: '5' // 写odps时压缩 :https://help.aliyun.com/zh/flink/developer-reference/maxcompute-connector compressAlgorithm: snappy //开启异步状态后端 state.backend.async: 'true' //状态后端开启增量(默认就是true 增量) state.backend.incremental: 'true' //作业链与处理槽共享组(默认为false),开启后在针对某个操作算子增加并行度和cu等资源时,不与其他槽位共享资源,单独增加额外资源 ###有用的参数 table.exec.split-slot-sharing-group-per-vertex: 'true' //Checkpoint间隔时间,单位为毫秒 默认180秒 ###如果作业量大,可以适当调大间隔时间。性能方便略有提升 execution.checkpointing.interval: 180s //State数据的生命周期,单位为毫秒。默认36小时 table.exec.state.ttl: 129600000 //Checkpoint生成超时时间(默认值10分钟),当Checkpoint生成时间超过10分钟,flink会把创建生成的Checkpoint杀掉,重新再创建生成Checkpoint。如果观察自己的job生成时间过长减少被杀死Checkpoint可以调大下面时间 ###有用的参数 execution.checkpointing.timeout :10min
// 初始化 table environment TableEnvironment tEnv = ... // 获取 tableEnv 的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数: // 开启 miniBatch configuration.setString("table.exec.mini-batch.enabled", "true"); // 批量输出的间隔时间 configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // 防止 OOM 设置每个批次最多缓存数据的条数,可以设为 2 万条 configuration.setString("table.exec.mini-batch.size", "20000"); // 开启 LocalGlobal(job有聚合函数使用) configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // 开启 Split Distinct (job有聚合函数使用) configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); // 第一层打散的 bucket 数目 (job有聚合函数使用) configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024"); // TopN 的缓存条数 (job有分组top使用) configuration.setString("table.exec.topn.cache-size", "200000"); // 指定时区 configuration.setString("table.local-time-zone", "Asia/Shanghai");
5).全量Checkpoint与增量Checkpoint的大小一致,是否正常?
如果您在使用Flink的情况下,观察到全量Checkpoint与增量Checkpoint的大小一致:
增量Checkpoint通常是从第二个Checkpoint开始体现出来的,在数据稳定输入且没有大规模的状态变更时,后续的增量Checkpoint应该显示出大小上的差异,表明系统正常地只对状态的增量部分进行快照。如果仍然一致,则需要进一步审查系统状态和行为,确认是否存在问题。
当上面添加配置性能还是不行是,可以增加资源。
*上面只是建议给taskmanager 1cup,4Gb内存起,原因现在很多平台大多是云虚拟资源,这样分配性能较好,同时也是养成良好习惯。
不是所有job资源越堆越多好。有时作业的复杂或数据的特殊情况(外部系统性能除外,例如写数据库),增加资源只会让job性能越来越差或报错(亲身经历job性能差,特别痛苦,一直加资源性能还是差或运行报错)。需要不断找根源问题,多使用不同方法测试才能找到适合job的处理性能。
*上面案例思想:
A.减少CP生成时间。flink才能快速处理数据(提交完已处理的偏移量数据,快速进行下一轮的新数据)。
B.在有回撤流,需要状态(自己观察在一个并发时CP较大几百兆,一般join情况出现的比较多)将多个并发尽量放到一个slot,减少数据传输和交换(一个槽位共享状态)。其他简单的job没有或很少回撤流的情况下可以只建一个槽位。
C.增加并行度会导致CP增大。原因之前一个线程一个CP,现在是多个线程有自己的状态(可能会有重复数据状态),多个状态合在一起CP就大了。
参考:文档1
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。