当前位置:   article > 正文

19.flink task数量,slot数量和taskManage数量_flink slot 并行度 taskmanger

flink slot 并行度 taskmanger

摘要

读者必须明白flink 任务的划分是怎么来的,如果不明白建议去查看相关资料,否则这篇文章对你而言没有任何意义。

1.parallelism.default:

我觉得还是英文说的清楚,就不翻译了。
The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program’s execution. Note: The default parallelism can be overwriten for an entire job by calling setParallelism(int parallelism) on the ExecutionEnvironment or by passing -p to the Flink Command-line frontend. It can be overwritten for single transformations by calling setParallelism(int parallelism) on an operator. See Parallel Execution for more information about parallelism.



英文解读:默认全局并发是 1,parallelism.default表示的是整个flink进程的底层并发threads数量。
parallelism.default参数设置方式

  1. env.setParallelism(int parallelism)

  2. /flink-1.13.6/bin/flink run … -Dparallelism.default=1

  3. /flink-1.13.6/bin/flink run … -p 1

  4. 代码中

    Configuration configuration = new Configuration();
    configuration.set(CoreOptions.DEFAULT_PARALLELISM,parallelism);
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    	```
    
    • 1
    • 2
    • 3
    • 4
  5. 直接在算子级别单独定义当前算子的并发(这个优先级最高)

    public class FlinkFunction {
        //对于自定义函数中的变量,只有内置的状态是完全按照flink内置的 keyBy行为来的
        //如果是自定义的缓存比如ArrayList 则可能不会按照预期的行为
        public static void main(String[] args) throws Exception {
            		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            		env.setParallelism(2);//全局并发
            		DataStream<String> dataStream = env
            		.fromElements( "b","b","b","d")
            		.setParallelism(4);//fromElements算子并发是4 优先级高于之前的2
            		dataStream.keyBy(x->{return x;}).map(new MyMap()).print();
            		env.execute();
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

2.taskmanager.numberOfTaskSlots

:The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager’s machine has (e.g., equal to the number of cores, or half the number of cores). More about task slots.



英文解读:taskManager进程管理的threads的数量. 默认是1,当大于一意思是一个TaskManger进程可以启动多个子线程去处理数据,比如 datestream.map(x->{…})当运行到这里的时候,如果numberOfTaskSlots设置为2,则会有两个线程同时执行map函数,这样同一时刻可以处理两个数据。
因此默认1的情况下,一个TaskManager进程只有一个处理线程,资源隔离较好。大于1的情况下可以有多个处理线程,共享TaskManager的资源,资源利用率高,隔离性不好。

3.parallelism.default和taskmanager.numberOfTaskSlots关联

parallelism.default = TaskManager JVM进程数量 * taskmanager.numberOfTaskSlots
举例说明:

  1. parallelism.default=10 && taskmanager.numberOfTaskSlots=1
    会有10个TaskManager进程数量(每个进程对应yarn的一个容器Container) 在这里插入图片描述

  2. parallelism.default=10 && taskmanager.numberOfTaskSlots=2
    会有5个TaskManager进程数量(每个进程对应yarn的一个容器Container)

    在这里插入图片描述

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号