当前位置:   article > 正文

Flink(60):Flink中通用ModelUtil工具类_flinkutil

flinkutil

目录

0. 相关文章链接

1. 开发目的

2. 环境依赖

3. 具体代码

4. 具体使用


0. 相关文章链接

Flink文章汇总

1. 开发目的

        在日常的Flink中开发中,基本是在自己电脑的idea工具上进行Flink程序开发,并在本地联通测试环境进行调试(比如测试环境的Kafka等),当在本地调试通过后将代码打包,然后提交到正式环境运行。但在此过程中,因为本地调试和线上运行时使用的配置不同,会造成我们代码上线时需要修改较多配置(比如Checkpoint地址等),并且还不能修改错,不然可能会污染线上数据。由此开发了一个通用的ModelUtil工具类,可以根据不同的运行环境来决定不同的配置,这样上线时就不需要去特意配置了。

        除了本地调试和上线运行外,在我们编写Flink代码时,针对环境变量由较多通用配置,如果每个程序主类都配置一遍,会感觉很麻烦,由此想到,可以将所有通用配置环境放到一个类里面统一配置,根据输入的参数来微调,这样创建Flink执行环境就较为简单了。

2. 环境依赖

具体环境依赖跟Flink依赖一致,可以参考博主另一篇文章: Flink(8):Flink的API说明和pom文件汇总

3. 具体代码

博主使用的环境为华为云,所以线上的checkpoint保存地址为华为云的obs文件系统,另外博主使用的Flink版本为1.10版本

该工具类包括如下4个功能(方法):

  • 根据传入key获取对应配置文件中的配置值
  • 对传入的Flink的流的执行环境配置Checkpoint(filesystem类型)
  • 对传入的Flink的流的执行环境配置Checkpoint(rocksdb类型)
  • 对传入的Flink的流的执行环境配置重启策略
  1. import org.apache.commons.lang3.SystemUtils;
  2. import org.apache.flink.api.common.restartstrategy.RestartStrategies;
  3. import org.apache.flink.api.common.time.Time;
  4. import org.apache.flink.contrib.streaming.state.PredefinedOptions;
  5. import org.apache.flink.contrib.streaming.state.RocksDBOptionsFactory;
  6. import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
  7. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  8. import org.apache.flink.streaming.api.CheckpointingMode;
  9. import org.apache.flink.streaming.api.environment.CheckpointConfig;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.util.TernaryBoolean;
  12. import org.rocksdb.ColumnFamilyOptions;
  13. import org.rocksdb.DBOptions;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import java.util.Collection;
  17. import java.util.ResourceBundle;
  18. import java.util.concurrent.TimeUnit;
  19. /**
  20. * @date: 2021/6/9
  21. * @author: yangshibiao
  22. * @desc: 模块工具类
  23. */
  24. public class ModelUtil {
  25. public static ResourceBundle localConfig = ResourceBundle.getBundle("localConfig");
  26. public static ResourceBundle config = ResourceBundle.getBundle("config");
  27. public static Logger logger = LoggerFactory.getLogger(ModelUtil.class);
  28. /**
  29. * 根据key获取配置值
  30. *
  31. * @param key 配置参数的key
  32. * @return 配置参数的value
  33. */
  34. public static String getConfigValue(String key) {
  35. if (SystemUtils.IS_OS_WINDOWS) {
  36. return localConfig.getString(key);
  37. } else {
  38. return config.getString(key);
  39. }
  40. }
  41. /**
  42. * 对传入的Flink的流的执行环境配置Checkpoint
  43. *
  44. * @param env Flink的流的执行环境
  45. * @param applicationName 应用程序名,会在checkpoint的路径下创建该应用程序的文件夹,用来保存该应用程序的checkpoint
  46. * @param interval checkpoint的时间间隔,单位:毫秒,filesystem模式的checkpoint建议间隔为 10s - 60s 之间的整10数
  47. */
  48. public static void deployFsCheckpoint(StreamExecutionEnvironment env, String applicationName, long interval) {
  49. // 启动checkpoint,设置为精确一次,并通过传入的参数设置时间间隔
  50. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  51. env.getCheckpointConfig().setCheckpointInterval(interval);
  52. // 设置 状态后端 和 checkpoint 为 filesystem 的模式,并通过配置文件指定文件夹(判断是本地调试还是集群环境)
  53. String checkpointPath = null;
  54. if (SystemUtils.IS_OS_WINDOWS) {
  55. checkpointPath = ModelUtil.getConfigValue("fs.checkpoint.path") + applicationName + "\\\\";
  56. } else {
  57. String obsPath = "obs://" +
  58. ModelUtil.getConfigValue("obs.ak") + ":" +
  59. ModelUtil.getConfigValue("obs.sk") + "@" +
  60. ModelUtil.getConfigValue("obs.endpoint") +
  61. ModelUtil.getConfigValue("fs.checkpoint.path");
  62. checkpointPath = obsPath + applicationName + "/";
  63. }
  64. env.setStateBackend(new FsStateBackend(checkpointPath));
  65. // 设置2个checkpoint之间的最小间隔,不需要设置,默认为0
  66. // env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);
  67. // 设置能容忍100个检查点的失败
  68. env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
  69. // 当作业被cancel时,不删除外部保存的检查点
  70. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  71. // 当在设置的时间内还没有保存成功认为该检查点失败,设置为interval的10倍
  72. env.getCheckpointConfig().setCheckpointTimeout(interval * 10);
  73. // 设置同时可以进行10个checkpoint
  74. env.getCheckpointConfig().setMaxConcurrentCheckpoints(10);
  75. logger.info(">>>>> 正在进行环境设置,会创建fs的checkpoint环境,applicationName:" + applicationName + " ; 间隔时间interval:" + interval + " ; ");
  76. }
  77. /**
  78. * 对传入的Flink的流的执行环境配置Checkpoint
  79. *
  80. * @param env Flink的流的执行环境
  81. * @param applicationName 应用程序名,会在checkpoint的路径下创建该应用程序的文件夹,用来保存该应用程序的checkpoint
  82. * @param interval checkpoint的时间间隔,单位:毫秒,RocksDB模式的checkpoint建议间隔为 1分钟到30分钟 之间的整分钟数
  83. */
  84. public static void deployRocksdbCheckpoint(StreamExecutionEnvironment env, String applicationName, long interval) {
  85. // 启动checkpoint,设置为精确一次,并通过传入的参数设置时间间隔
  86. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  87. env.getCheckpointConfig().setCheckpointInterval(interval);
  88. // 设置 状态后端 和 checkpoint 为 filesystem 的模式,并通过配置文件指定文件夹(判断是本地调试还是集群环境)
  89. String checkpointPath = null;
  90. if (SystemUtils.IS_OS_WINDOWS) {
  91. checkpointPath = ModelUtil.getConfigValue("rocksdb.checkpoint.path") + applicationName + "\\\\";
  92. } else {
  93. String obsPath = "obs://" +
  94. ModelUtil.getConfigValue("obs.ak") + ":" +
  95. ModelUtil.getConfigValue("obs.sk") + "@" +
  96. ModelUtil.getConfigValue("obs.endpoint") +
  97. ModelUtil.getConfigValue("rocksdb.checkpoint.path");
  98. checkpointPath = obsPath + applicationName + "/";
  99. }
  100. RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), TernaryBoolean.TRUE);
  101. // 预定义选项,SPINNING_DISK_OPTIMIZED为基于磁盘的优化,一般使用SPINING_DISK_OPTIMIZED_HIGH_MEM,但这会消耗比较多的内存
  102. rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
  103. rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
  104. @Override
  105. public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
  106. return currentOptions
  107. // 指定信息日志文件的最大大小。 如果当前日志文件大于' max_log_file_size ',一个新的信息日志文件将被创建。如果为0,所有日志将被写入一个日志文件。
  108. .setMaxLogFileSize(64 * 1024 * 1024)
  109. // 信息日志文件的最大保留个数。
  110. // .setKeepLogFileNum(3)
  111. ;
  112. }
  113. @Override
  114. public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
  115. return currentOptions;
  116. }
  117. });
  118. env.setStateBackend(rocksDbBackend);
  119. // 设置2个checkpoint之间的最小间隔,不需要设置,默认为0
  120. // env.getCheckpointConfig().setMinPauseBetweenCheckpoints(interval);
  121. // 设置能容忍100个检查点的失败
  122. env.getCheckpointConfig().setTolerableCheckpointFailureNumber(100);
  123. // 当作业被cancel时,不删除外部保存的检查点
  124. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  125. // 当在设置的时间内还没有保存成功认为该检查点失败,设置为interval的10倍
  126. env.getCheckpointConfig().setCheckpointTimeout(interval * 10);
  127. // 设置同时可以进行10个checkpoint
  128. env.getCheckpointConfig().setMaxConcurrentCheckpoints(10);
  129. logger.info(">>>>> 正在进行环境设置,会创建rocksdb的checkpoint环境,applicationName:" + applicationName + " ; 间隔时间interval:" + interval + " ; ");
  130. }
  131. /**
  132. * 对传入的Flink的流的执行环境配置重启策略
  133. *
  134. * @param env Flink的流的执行环境
  135. */
  136. public static void deployRestartStrategy(StreamExecutionEnvironment env) {
  137. // 当任务中异常失败后,会重启任务3次,间隔时间为60秒
  138. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(60, TimeUnit.SECONDS)));
  139. logger.info(">>>>> 正在进行环境设置,重启策略为:重启任务3次,间隔时间为60秒");
  140. // 10分钟内重启5次,每次间隔2分钟(排除了网络等问题,如果再失败,需要手动查明原因)
  141. // env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(10, TimeUnit.MINUTES), Time.of(2, TimeUnit.MINUTES)));
  142. // logger.info(">>>>> 正在进行环境设置,重启策略为:10分钟内重启5次,每次间隔2分钟");
  143. }
  144. }

4. 具体使用

如下代码所示,在具体使用中,通过简短的4行代码既可以配置出Flink的运行环境:

  1. public static void main(String[] args) throws Exception {
  2. // 包括但不限于kafka的消费者id
  3. String applicationName = "应用名";
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  6. ModelUtil.deployRocksdbCheckpoint(env, applicationName, 3 * 1000);
  7. ModelUtil.deployRestartStrategy(env);
  8. // 具体业务代码
  9. env.execute(applicationName);
  10. }

在resources文件夹下,需要配置出本地运行环境和线上运行环境的不同的配置选项,具体文件如下图所示:

线上运行配置如下图所示:

本地运行配置如下图所示:


注:其他相关文章链接由此进 -> Flink文章汇总


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/730540
推荐阅读
相关标签
  

闽ICP备14008679号