当前位置:   article > 正文

【实战-06】如何正确设置flink参数,全网最全_flink参数设置

flink参数设置

如何正确设置参数

很多人在应用flink DataStream 或者是Flinksql 的时候对于一些参数设置知道的不是很清晰,本文带领大家彻底搞定这一块。

  1. 在flink的配置文件中设置,这个就不多说了,缺点就是不够灵活
  2. 在代码层面设置
  3. 启动任务的时候通过控制台传参

flink Table模式下的参数

public class SingleTableMain {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment  tEnv = StreamTableEnvironment.create(env);
 }

    public static void setStateBackendAndCheckpoint(StreamExecutionEnvironment env, String checkpointPath) {
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));   // 设置状态后端
        //每30秒启动一个检查点
        env.enableCheckpointing(60000);
        //允許几次檢查點失敗
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

        // 设置状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath));

        //检查点保存模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //设置最小间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2));

        //设置超时时长
        env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.MINUTES.toMillis(5));

        // 最大并发检查点数量,如果上面设置了 最小间隔,其实这个参数已经不起作用了
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //可恢复
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //重试机制
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.of(30, TimeUnit.SECONDS)));
    }


    public static void enableMiniBatch(StreamTableEnvironment tEnv, Duration duration, Long size) {
        //The maximum number of input records can
        // be buffered for MiniBatch.
        // MiniBatch is an optimization to buffer input records to reduce state access.
        // MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached.
        // NOTE: MiniBatch only works for non-windowed aggregations currently.
        // If table.exec.mini-batch.enabled is set true, its value must be positive.
        Configuration configuration = new Configuration();
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, true);
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, duration);
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, size);
        tEnv.getConfig().addConfiguration(configuration);

    }

    public static void setTTL(StreamTableEnvironment tEnv, Duration duration) {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, duration);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void enableSqlOptimizer(StreamTableEnvironment tEnv) {
        Configuration configuration = new Configuration();
        // 设置两阶段聚合
        configuration.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
        //开启 Split Distinct
        configuration.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        configuration.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_BUCKET_NUM, 2048);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void setIdleTimeout(StreamTableEnvironment tEnv, Duration duration) {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT, duration);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void setJobName(StreamTableEnvironment tEnv, String jobName) {
        Configuration configuration = new Configuration();
        configuration.set(PipelineOptions.NAME, jobName);
        tEnv.getConfig().addConfiguration(configuration);
    }

    public static void enableDynamicParam(StreamTableEnvironment tEnv){
        Configuration configuration = new Configuration();
        configuration.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,true);
        tEnv.getConfig().addConfiguration(configuration);
        logger.info("[SqlUntil] 开启动态table参数");

    }


}       
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91

Table 模式下参数类相关

上面的代码对应下方的内部类,正好对应Table SQL模式下的:执行参数,优化参数,表参数
在这里插入图片描述

DataStream 模式下怎么设置参数?

        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,"hdfs://xxx:8020/remote-default-checkpoints/penggan/");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
  • 1
  • 2
  • 3

上面这种只是举了个例子,感兴趣的请看下图:
在这里插入图片描述

总结

上面是最为原始的设置参数的方式,支持的参数对应的其实就是:flink官网核心参数
flink tablesql 参数

参数设置的三种方法

  1. 动态传参(启动程序在cmd -D传入)
    下面例子中的-Dkey=value 就是动态传参数

    /usr/local/data/penggan/flink-1.13.6/bin/flink run-application -t yarn-application  \
    -Denv.java.opts="-Dfile.encoding=UTF-8" \
    -Djobmanager.memory.process.size=1024m \
    -Dtaskmanager.memory.process.size=3072m \
    -Dtaskmanager.numberOfTaskSlots=1 \
    -Dparallelism.default=1 \
    -Dyarn.provided.lib.dirs="hdfs://cm01/pg/flinklib" \
    -Dyarn.application.name=DominoCafeSingle \
    -c com.bk.domino.single.SingleTableMain \
    /usr/local/data/penggan/jobs/original-FlinkSql-1.0-SNAPSHOT.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  2. 程序代码设置
    请参考前面的代码主要就是Configration类配置的参数

  3. 读取默认配置文件(flink-conf.yml)
    上文讲的是代码设置, 其实参数底层解析的就是截图中的类, 可以看到根据参数的行为flink内部用不通的类去解析,这些类我们也可以直接拿来修改参数, 这就是本文的意义。

参数的优先级

cmd > 代码 > flink-conf.yml

yarn.provided.lib.dirs参数详解

当提供了yarn.provided.lib.dirs参数的时候,flink会放弃加载安装包/…/…/lib中的所有flink提供的预置jar包,而是从提供的hdfs地址加载。

1. 将flink安装包下的 /../lib 目录下的所有包上传到指定的hdfs地址
2. 将自己代码中用到的其它所有额外包都上传到这个地址,然后mvn打包的时候打个源码包  全部用<scope>provided</scope>
3. 然后运行的时候 -Dyarn.provided.lib.dirs指定hdfs存放jar包的路径
  • 1
  • 2
  • 3

这样做的好处是,打的flink运行jar包只有几百kb, 所有的依赖都被存储在hdfs上。

yarn.provided.usrlib.dir参数讲解

和yarn.provided.lib.dirs有相似的地方

  1. 相同点:将flink安装包下的 /…/lib 目录下的所有包上传到指定的hdfs地址
  2. 不同点:yarn.provided.usrlib.dir 目录必须以usrlib命名:-Dyarn.provided.usrlib.dir=“hdfs://ip:9000/usrlib/”
    yarn.provided.lib.dirs中的jar会被每个集群节点缓存,下次启动的时候优先从本地缓存获取
    yarn.provided.usrlib.dir的jar会不被每个集群节点缓存,下次启动的时候重新从hdfs获取

个人推荐: yarn.provided.lib.dirs参数

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/919070
推荐阅读
相关标签
  

闽ICP备14008679号