赞
踩
flink 环境构建工具类
- public class ExecutionEnvUtil {
-
- /**
- * 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)
- *
- * @param args
- * @return org.apache.flink.api.java.utils.ParameterTool
- * @date 2023/8/4 - 10:05 AM
- */
- public static ParameterTool createParameterTool(final String[] args) throws Exception {
- return ParameterTool
- .fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME))
- .mergeWith(ParameterTool.fromArgs(args))
- .mergeWith(ParameterTool.fromSystemProperties());
- }
-
-
- /**
- * flink 环境配置
- *
- * @param parameterTool
- * @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
- * @date 2023/8/4 - 11:10 AM
- */
- public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));
- env.getConfig();
- env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));
- if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {
- CheckPointUtil.setCheckpointConfig(env,parameterTool);
- // 取消作业时保留外部化 Checkpoint 数据
- env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- }
- env.getConfig().setGlobalJobParameters(parameterTool);
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- return env;
- }
- }
checkpoint 工具类
- public class CheckPointUtil {
-
- private static final String CHECKPOINT_MEMORY = "memory";
- private static final String CHECKPOINT_FS = "fs";
- private static final String CHECKPOINT_ROCKETSDB = "rocksdb";
- /**
- * 默认的checkpoint 存储地址
- */
- private static final String CHECKPOINT_DEFAULT = "default";
-
-
- /**
- * 设置flink check point
- *
- * @param env
- * @param parameterTool
- * @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
- * @date 2023/8/4 - 10:49 AM
- */
- public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{
- // 根据类型,设置合适的状态后端
- String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);
- if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {
- //1、state 存放在内存中,默认是 5M
- StateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);
- env.setStateBackend(stateBackend);
- }
- else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {
-
- StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);
- env.setStateBackend(stateBackend);
- }
- else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {
- RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);
- env.setStateBackend(rocksDBStateBackend);
- }
-
- //设置 checkpoint 周期时间
- env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));
-
- //高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)
- // 设置 exactly-once 模式
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- // 设置 checkpoint 最小间隔 500 ms
- env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);
- // 设置 checkpoint 必须在n分钟内完成,否则会被丢弃
- env.getCheckpointConfig().setCheckpointTimeout(15*60000);
- // 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败
- env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
- // 设置 checkpoint 的并发度为 1
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-
- return env;
- }
- }
构建kafak source 、sink
- /**
- * 构建 source kafka
- *
- * @param parameterTool
- * @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>
- * @date 2023/8/4 - 2:41 PM
- */
- private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){
- Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);
-
- // 正则表达式消费
- FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
- Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),
- new SimpleStringSchema(),
- props);
-
- kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
-
- // 从最开始的位置开始消费
- if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){
- kafkaConsumer.setStartFromEarliest();
- }else{
- kafkaConsumer.setStartFromGroupOffsets();
- }
-
- return kafkaConsumer;
- }
-
-
- /**
- * 构建 sink kafka
- *
- * @param parameterTool
- * @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>
- * @date 2023/8/16 - 11:38 AM
- */
- private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){
- Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);
- return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC)
- , (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->
- new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes())
- ,props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
- }
kafka 工具类
- public class KafkaConfigUtil {
-
-
- /**
- * 设置 kafka 配置
- *
- * @param parameterTool
- * @return java.util.Properties
- * @date 2023/8/4 - 2:39 PM
- */
- public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {
- Properties props = parameterTool.getProperties();
- props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
- props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
- props.put("flink.partition-discovery.interval-millis", "10000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-
- props.put("auto.offset.reset", "latest");
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
-
- //0817 - 消费kafka数据超时时间和尝试次数
- props.put("request.timeout.ms", "30000");
- props.put("retries", 5);
-
- return props;
- }
-
- /**
- * 构建 sink kafka 配置
- *
- * @param parameterTool
- * @return java.util.Properties
- * @date 2023/8/14 - 5:54 PM
- */
- public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {
- Properties props = parameterTool.getProperties();
- props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("auto.offset.reset", "latest");
- props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
- props.put("security.protocol", "SASL_PLAINTEXT");
- props.put("sasl.mechanism", "PLAIN");
-
- props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
- props.put(ProducerConfig.ACKS_CONFIG, "1");
- props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
- return props;
- }
-
- }
jdbc 工具类
- public class JdbcDatasourceUtils {
-
-
- public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();
-
-
- /**
- * 获取hikari数据库链接池
- *
- * @param jdbcUrl
- * @param dsUname
- * @param dsPwd
- * @param dsDriver
- * @return com.zaxxer.hikari.HikariDataSource
- * @date 2023/8/9 - 2:23 PM
- */
- public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
- String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);
- if (!DATASOURCES.containsKey(md5Key)) {
- synchronized (JdbcDatasourceUtils.class) {
- if (!DATASOURCES.containsKey(md5Key)) {
- DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));
- }
- }
- }
- return DATASOURCES.get(md5Key);
- }
-
-
- /**
- * 构建hikari数据库链接池
- *
- * @param jdbcUrl
- * @param dsUname
- * @param dsPwd
- * @param dsDriver
- * @return com.zaxxer.hikari.HikariDataSource
- * @date 2023/8/9 - 2:14 PM
- */
- private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
- HikariConfig config = new HikariConfig();
- config.setJdbcUrl(jdbcUrl);
- config.setUsername(dsUname);
- config.setPassword(dsPwd);
- config.setDriverClassName(dsDriver);
- // 从池返回的连接的默认自动提交,默认值:true
- config.setAutoCommit(true);
- //只读
- config.setReadOnly(true);
- // 连接超时时间:毫秒,默认值30秒
- config.setConnectionTimeout(10000);
- // 最大连接数
- config.setMaximumPoolSize(32);
- // 最小空闲连接
- config.setMinimumIdle(16);
- // 空闲连接超时时间
- config.setIdleTimeout(600000);
- // 连接最大存活时间
- config.setMaxLifetime(540000);
- // 连接测试查询
- config.setConnectionTestQuery("SELECT 1");
- return new HikariDataSource(config);
- }
-
-
-
- /**
- * 按列加载数据
- *
- * @param dataSource
- * @param sql
- * @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
- * @date 2023/8/15 - 6:03 PM
- */
- public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {
- return loadSql(dataSource, sql, resultSet -> {
- List<Map<String, Object>> datas = new ArrayList<>();
- try {
- if (null == resultSet){
- return datas;
- }
- ResultSetMetaData metaData = resultSet.getMetaData();
- //组装返回值
- Map<String, Object> entry;
- while (resultSet.next()) {
- entry = new LinkedHashMap<>();
- // getColumnLabel 取重命名,getColumnName 原始字段名
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));
- }
- datas.add(entry);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return datas;
- });
- }
-
-
- /**
- * 加载数据遍历放入set集合
- *
- * @param dataSource
- * @param sql
- * @param function
- * @return java.util.Set<R>
- * @date 2023/8/15 - 6:03 PM
- */
- public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {
- return loadSql(dataSource, sql, resultSet -> {
- Set<R> datas = new LinkedHashSet<>();
- try {
- if (null == resultSet){
- return datas;
- }
- ResultSetMetaData metaData = resultSet.getMetaData();
- while (resultSet.next()) {
- for (int i = 1; i <= metaData.getColumnCount(); i++) {
- datas.add(function.apply(resultSet.getObject(i)));
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- return datas;
- });
- }
-
-
- /**
- * 执行查询sql
- *
- * @param dataSource
- * @param sql
- * @param function
- * @return R
- * @date 2023/8/15 - 6:03 PM
- */
- private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {
- Connection connection = null;
- PreparedStatement preparedStatement = null;
- ResultSet resultSet = null;
- try {
- connection = dataSource.getConnection();
- preparedStatement = connection.prepareStatement(sql);
- resultSet = preparedStatement.executeQuery();
-
- return function.apply(resultSet);
- } catch (Exception e){
- e.printStackTrace();
- } finally {
- if (connection != null){
- try {
- connection.close();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- if (preparedStatement != null){
- try {
- preparedStatement.close();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- if (resultSet != null){
- try {
- resultSet.close();
- } catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
- return function.apply(null);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。