当前位置:   article > 正文

Flink学习(二) job 执行流程_flink job运行方式

flink job运行方式

本片文章主要介绍以下2点:1、job代码是如何被编译成ExecutionGraph。2、任务是如何运行和调度。(以RemoteEnvironment 模式记录而非Local)。

flink job 是如何生成ExecutionGraph

首先看下简单的flink 消费Kafka的代码:

public static void main(String[] args) throws Exception {
   
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "test");
    FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<String>("pepsi-test",
            new SimpleStringSchema()
            ,properties);
    env.addSource(consumer)
            .process(...)
            .keyBy(1)
            .timeWindow(Time.seconds(10))
            .aggregate(....)
            .sink();
    env.execute();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  1. 算子(transform)的注册 : 从代码中可看到,首先我们获取到当前运行的环境信息: StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();这个环境主要记录了并行度配置、算子信息、checkpoint 等配置信息,同时提供了整个flink作业运行的方法入口:execute方法。
    这里每个操作都是一个算子(transform),如:source、process、sink、keyby(虚拟节点)等都属于算子。算子的注册就是将其添到StreamExecutionEnvironment的transformations集合中。
    代码参考如下
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
   

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    transformation.getOutputType();
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            this.transformation,
            operatorName,
            operator
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号