赞
踩
本片文章主要介绍以下2点:1、job代码是如何被编译成ExecutionGraph。2、任务是如何运行和调度。(以RemoteEnvironment 模式记录而非Local)。
首先看下简单的flink 消费Kafka的代码:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<String>("pepsi-test", new SimpleStringSchema() ,properties); env.addSource(consumer) .process(...) .keyBy(1) .timeWindow(Time.seconds(10)) .aggregate(....) .sink(); env.execute(); }
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。