当前位置:   article > 正文

Flink java 工具类_flink工具类

flink工具类

flink 环境构建工具类

  1. public class ExecutionEnvUtil {
  2. /**
  3. * 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)
  4. *
  5. * @param args
  6. * @return org.apache.flink.api.java.utils.ParameterTool
  7. * @date 2023/8/4 - 10:05 AM
  8. */
  9. public static ParameterTool createParameterTool(final String[] args) throws Exception {
  10. return ParameterTool
  11. .fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME))
  12. .mergeWith(ParameterTool.fromArgs(args))
  13. .mergeWith(ParameterTool.fromSystemProperties());
  14. }
  15. /**
  16. * flink 环境配置
  17. *
  18. * @param parameterTool
  19. * @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
  20. * @date 2023/8/4 - 11:10 AM
  21. */
  22. public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {
  23. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  24. env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));
  25. env.getConfig();
  26. env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));
  27. if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {
  28. CheckPointUtil.setCheckpointConfig(env,parameterTool);
  29. // 取消作业时保留外部化 Checkpoint 数据
  30. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  31. }
  32. env.getConfig().setGlobalJobParameters(parameterTool);
  33. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  34. return env;
  35. }
  36. }

checkpoint 工具类

  1. public class CheckPointUtil {
  2. private static final String CHECKPOINT_MEMORY = "memory";
  3. private static final String CHECKPOINT_FS = "fs";
  4. private static final String CHECKPOINT_ROCKETSDB = "rocksdb";
  5. /**
  6. * 默认的checkpoint 存储地址
  7. */
  8. private static final String CHECKPOINT_DEFAULT = "default";
  9. /**
  10. * 设置flink check point
  11. *
  12. * @param env
  13. * @param parameterTool
  14. * @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
  15. * @date 2023/8/4 - 10:49 AM
  16. */
  17. public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{
  18. // 根据类型,设置合适的状态后端
  19. String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);
  20. if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {
  21. //1、state 存放在内存中,默认是 5M
  22. StateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);
  23. env.setStateBackend(stateBackend);
  24. }
  25. else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {
  26. StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);
  27. env.setStateBackend(stateBackend);
  28. }
  29. else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {
  30. RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);
  31. env.setStateBackend(rocksDBStateBackend);
  32. }
  33. //设置 checkpoint 周期时间
  34. env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));
  35. //高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)
  36. // 设置 exactly-once 模式
  37. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  38. // 设置 checkpoint 最小间隔 500 ms
  39. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);
  40. // 设置 checkpoint 必须在n分钟内完成,否则会被丢弃
  41. env.getCheckpointConfig().setCheckpointTimeout(15*60000);
  42. // 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败
  43. env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
  44. // 设置 checkpoint 的并发度为 1
  45. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  46. return env;
  47. }
  48. }

构建kafak source 、sink

  1. /**
  2. * 构建 source kafka
  3. *
  4. * @param parameterTool
  5. * @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>
  6. * @date 2023/8/4 - 2:41 PM
  7. */
  8. private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){
  9. Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);
  10. // 正则表达式消费
  11. FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
  12. Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),
  13. new SimpleStringSchema(),
  14. props);
  15. kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
  16. // 从最开始的位置开始消费
  17. if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){
  18. kafkaConsumer.setStartFromEarliest();
  19. }else{
  20. kafkaConsumer.setStartFromGroupOffsets();
  21. }
  22. return kafkaConsumer;
  23. }
  24. /**
  25. * 构建 sink kafka
  26. *
  27. * @param parameterTool
  28. * @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>
  29. * @date 2023/8/16 - 11:38 AM
  30. */
  31. private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){
  32. Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);
  33. return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC)
  34. , (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->
  35. new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes())
  36. ,props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
  37. }

kafka 工具类

  1. public class KafkaConfigUtil {
  2. /**
  3. * 设置 kafka 配置
  4. *
  5. * @param parameterTool
  6. * @return java.util.Properties
  7. * @date 2023/8/4 - 2:39 PM
  8. */
  9. public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {
  10. Properties props = parameterTool.getProperties();
  11. props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
  12. props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
  13. props.put("flink.partition-discovery.interval-millis", "10000");
  14. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  15. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  16. props.put("auto.offset.reset", "latest");
  17. props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
  18. props.put("security.protocol", "SASL_PLAINTEXT");
  19. props.put("sasl.mechanism", "PLAIN");
  20. //0817 - 消费kafka数据超时时间和尝试次数
  21. props.put("request.timeout.ms", "30000");
  22. props.put("retries", 5);
  23. return props;
  24. }
  25. /**
  26. * 构建 sink kafka 配置
  27. *
  28. * @param parameterTool
  29. * @return java.util.Properties
  30. * @date 2023/8/14 - 5:54 PM
  31. */
  32. public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {
  33. Properties props = parameterTool.getProperties();
  34. props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));
  35. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  36. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  37. props.put("auto.offset.reset", "latest");
  38. props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
  39. props.put("security.protocol", "SASL_PLAINTEXT");
  40. props.put("sasl.mechanism", "PLAIN");
  41. props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
  42. props.put(ProducerConfig.ACKS_CONFIG, "1");
  43. props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
  44. return props;
  45. }
  46. }

jdbc 工具类

  1. public class JdbcDatasourceUtils {
  2. public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();
  3. /**
  4. * 获取hikari数据库链接池
  5. *
  6. * @param jdbcUrl
  7. * @param dsUname
  8. * @param dsPwd
  9. * @param dsDriver
  10. * @return com.zaxxer.hikari.HikariDataSource
  11. * @date 2023/8/9 - 2:23 PM
  12. */
  13. public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
  14. String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);
  15. if (!DATASOURCES.containsKey(md5Key)) {
  16. synchronized (JdbcDatasourceUtils.class) {
  17. if (!DATASOURCES.containsKey(md5Key)) {
  18. DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));
  19. }
  20. }
  21. }
  22. return DATASOURCES.get(md5Key);
  23. }
  24. /**
  25. * 构建hikari数据库链接池
  26. *
  27. * @param jdbcUrl
  28. * @param dsUname
  29. * @param dsPwd
  30. * @param dsDriver
  31. * @return com.zaxxer.hikari.HikariDataSource
  32. * @date 2023/8/9 - 2:14 PM
  33. */
  34. private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
  35. HikariConfig config = new HikariConfig();
  36. config.setJdbcUrl(jdbcUrl);
  37. config.setUsername(dsUname);
  38. config.setPassword(dsPwd);
  39. config.setDriverClassName(dsDriver);
  40. // 从池返回的连接的默认自动提交,默认值:true
  41. config.setAutoCommit(true);
  42. //只读
  43. config.setReadOnly(true);
  44. // 连接超时时间:毫秒,默认值30秒
  45. config.setConnectionTimeout(10000);
  46. // 最大连接数
  47. config.setMaximumPoolSize(32);
  48. // 最小空闲连接
  49. config.setMinimumIdle(16);
  50. // 空闲连接超时时间
  51. config.setIdleTimeout(600000);
  52. // 连接最大存活时间
  53. config.setMaxLifetime(540000);
  54. // 连接测试查询
  55. config.setConnectionTestQuery("SELECT 1");
  56. return new HikariDataSource(config);
  57. }
  58. /**
  59. * 按列加载数据
  60. *
  61. * @param dataSource
  62. * @param sql
  63. * @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
  64. * @date 2023/8/15 - 6:03 PM
  65. */
  66. public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {
  67. return loadSql(dataSource, sql, resultSet -> {
  68. List<Map<String, Object>> datas = new ArrayList<>();
  69. try {
  70. if (null == resultSet){
  71. return datas;
  72. }
  73. ResultSetMetaData metaData = resultSet.getMetaData();
  74. //组装返回值
  75. Map<String, Object> entry;
  76. while (resultSet.next()) {
  77. entry = new LinkedHashMap<>();
  78. // getColumnLabel 取重命名,getColumnName 原始字段名
  79. for (int i = 1; i <= metaData.getColumnCount(); i++) {
  80. entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));
  81. }
  82. datas.add(entry);
  83. }
  84. } catch (Exception e) {
  85. e.printStackTrace();
  86. }
  87. return datas;
  88. });
  89. }
  90. /**
  91. * 加载数据遍历放入set集合
  92. *
  93. * @param dataSource
  94. * @param sql
  95. * @param function
  96. * @return java.util.Set<R>
  97. * @date 2023/8/15 - 6:03 PM
  98. */
  99. public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {
  100. return loadSql(dataSource, sql, resultSet -> {
  101. Set<R> datas = new LinkedHashSet<>();
  102. try {
  103. if (null == resultSet){
  104. return datas;
  105. }
  106. ResultSetMetaData metaData = resultSet.getMetaData();
  107. while (resultSet.next()) {
  108. for (int i = 1; i <= metaData.getColumnCount(); i++) {
  109. datas.add(function.apply(resultSet.getObject(i)));
  110. }
  111. }
  112. } catch (Exception e) {
  113. e.printStackTrace();
  114. }
  115. return datas;
  116. });
  117. }
  118. /**
  119. * 执行查询sql
  120. *
  121. * @param dataSource
  122. * @param sql
  123. * @param function
  124. * @return R
  125. * @date 2023/8/15 - 6:03 PM
  126. */
  127. private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {
  128. Connection connection = null;
  129. PreparedStatement preparedStatement = null;
  130. ResultSet resultSet = null;
  131. try {
  132. connection = dataSource.getConnection();
  133. preparedStatement = connection.prepareStatement(sql);
  134. resultSet = preparedStatement.executeQuery();
  135. return function.apply(resultSet);
  136. } catch (Exception e){
  137. e.printStackTrace();
  138. } finally {
  139. if (connection != null){
  140. try {
  141. connection.close();
  142. } catch (SQLException e) {
  143. e.printStackTrace();
  144. }
  145. }
  146. if (preparedStatement != null){
  147. try {
  148. preparedStatement.close();
  149. } catch (SQLException e) {
  150. e.printStackTrace();
  151. }
  152. }
  153. if (resultSet != null){
  154. try {
  155. resultSet.close();
  156. } catch (SQLException e) {
  157. e.printStackTrace();
  158. }
  159. }
  160. }
  161. return function.apply(null);
  162. }
  163. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/751887
推荐阅读
相关标签
  

闽ICP备14008679号