赞
踩
系列文章目录
第一章 资源配置调优
第二章 状态及Checkpoint 调优
第三章 反压处理
第四章 数据倾斜
第五章 job优化
第六章 flinksql调优
第七章 常见故障排查
FlinkSQL官网配置参数:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/
Flink SQL新手有可能犯的错误,其中之一就是 忘记设置空闲状态保留时间导致状态爆炸。列举两个场景:
Flink SQL可以指定空闲状态(即未更新的状态)被保留的最小时间 当状态中某个 key
对应的 状态 未更新的时间达到阈值时,该条状态被自动清理
//API 指定
tableEnv.getConfig().setIdleStateRetention(Duration.ofHours(1));
//参数指定
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.state.ttl", "1h");
MiniBatch是微批处理,原理是 缓存一定的数据后再触发处理,以减少对 State 的访问从而提升吞吐并减少数据的输出量。MiniBatch主要依靠在每个Task上注册的Timer线程来触发微批,需要消耗一定的线程调度性能。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled", true);
批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency", 5s);
防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为 2 万条
configuration.setString("table.exec.mini batch.size", 20000);
微批处理通过增加延迟换取高吞吐,如果有超低延迟的要求,不建议开启微批处理。通
常对于聚合的场景,微批处理可以显著的提升系统性能,建议开启。
原理概述
LocalGlobal优化将原先的 Aggregate 分成 Local+Global 两阶段聚合,即MapReduce 模型中的 Combine+Reduce 处理模式。第一阶段在上游节点本地攒一批数据进行聚合( localAgg ),并输出这次微批的增量值 A ccumulator )。第二阶段再将收到的 Accumulator 合并( Merge ),得到最终的结果 GlobalAgg )。
LocalGlobal本质上能够靠 LocalAgg 的聚合筛除部分倾斜数据,从而降低 GlobalAgg的热点,提升性能。结合下图理解 LocalGlobal 如何解决数据倾斜的问题。
由上图可知:
LocalGlobal 开启方式:
1)LocalGlobal 优化需要先开启 MiniBatch ,依赖于 MiniBatch 的参数。
2)table.optimizer.agg phase strategy : 聚合策略。默认 AUTO ,支持参数 AUTO 、TWO_PHASE( 使用 LocalGlobal 两阶段聚合 、 ONE_PHASE( 仅使用 Global 一阶段聚合)。
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数:
//开启 miniBatch
configuration.setString("table.exec.mini-batch.enabled",true);
//批量输出的间隔时间
configuration.setString("table.exec.mini-batch.allow-latency ", "5s");
//防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为 2 万条
configuration.setString("table.exec.mini-batch.size", "20000");
// 开启 LocalGlobal
config uration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
注意事项:
1)需要先开启 MiniBatch
2)开启 LocalGlobal 需要 UDAF 实现 Merge 方法 。
提交案例:统计每天每个 mid 出现次数
可以看到存在数据倾斜。
提交案例:开启 miniBatch 和 LocalGlobal
从WebUI 可以看到分组聚合变成了 Local 和 Global 两部分, 数据相对均匀,且没有数据倾斜。
LocalGlobal优化针对普通聚合(例如 SUM 、 COUNT 、 MAX 、 MIN 和 AVG )有较好的效果,对于 DISTINCT 的聚合(如 COUNT DISTINCT 收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 Global 节点仍然存在热点。
原理概述
之前,为了解决COUNT DISTINCT 的热点问题,通常需要手 动改写为两层聚合(增加按 Distinct Key取模的打散层)。
从 Flink1.9.0 版本开始,提供了 COUNT DISTINCT 自动打散功能, 通过HASH_CODE(distinct_key) % BUCKET_NUM 打散,不需要手动重写。Split Distinct 和LocalGlobal 的原理对比参见下图。
Distinct举例
SELECT a,COUNT(DISTINCT b
FROM T
GROUP BY a
手动打散举例
SELECT a,SUM(
FROM (
SELECT a,COUNT(DISTINCT b ) as cnt
FROM T
GROUP BY a,MOD(HASH_ b),
GROUP BY a
Split Distinct 开启方式
默认不开启,使用参数显式开启
// 初始化 table environment
TableEnvironment tEnv = ...
// 获取 tableEnv 的配置对象
Configuration configuration = tEnv.getConfig().getConfiguration();
// 设置参数: 要结合 minibatch 一起 使用
//开启 Split Distinct
configuration.setString(" table.optimizer.distinct agg.split.enabled", "true");
//第一层 打 散 的 bucket 数目
configuration.setString(" table.optimizer.distinct agg.split.bucket num", "1024");
注意事项:
(1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。
(2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。
(3)该功能在 Fl ink1.9.0 版本 及以上版本才支持。
提交案例: count ( 存在热点问题)
提交案例:开启 split distinct
从WebUI 可以看到有两次聚合,而且有 partialFinal 字样,第二次聚合时已经均匀 。
在某些场景下,可能需要从不同维度来统计count distinct )的结果 (比如统计 uv 、app 端的 uv 、 web 端的 uv 可能会使用如下 CASE WHEN 语法 。
SELECT
a,
COUNT(DISTINCT b ) AS total_ b,
COUNT(DISTINCT CASE WHEN c IN (' A ', B ') THEN b ELSE NULL END) AS AB b,
COUNT(DISTINCT CASE WHEN c IN (' C ', D ') THEN b ELSE NULL END) AS CD_b
FROM T
GROUP BY a
在这种情况下,建议使用FILTER 语法 , 目前的 Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如,在上面的示例中,三个 COUNT DISTINCT 都作用在 b 列上。此时,经过优化器识别后,Flink 可以只使用一个共享状态实例,而不是三个状态实例,可减少状态的大小和对状态的访问。
将上边的CASE WHEN 替换成 FILTER 后 ,如下所示:
SELECT
a,
COUNT(DISTINCT b ) AS b,
COUNT(DISTINCT b ) FILTER (WHERE c IN (' A ', B ')) AS AB_b,
COUNT(DISTINCT b ) FILTER (WHERE c IN (' C ', D ')) AS CD b
FROM T
GROUP BY a
提交案例 :多维 Distinct
提交案例: 使用 Filter
通过WebUI 对比 前 1 0 次 Checkpoint 的大小,可以看到 状态 有所减小。
总结以上的调优参数,代码如下:
// 初始化 table environment TableEnvironment tEnv = ... // 获取 tableEnv 的配置对象 Configuration configuration = tEnv.getConfig().getConfiguration(); // 设置参数: //开启 miniBatch configuration.setString("table.exec.mini batch.enabled", "true"); //批量输出的间隔时间 configuration.setString("table.exec.mini batch.allow latency", "5s"); //防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为 2 万条 configuration.setString("table.exec.mini-batch.size", "20000"); // 开启 LocalGlobal configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); //开启 Split Distinct configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); //第一层 打 散 的 bucket 数目 configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024"); //指定时区 configuration.setString("table.local-time-zone", "Asia/Shang hai");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。