赞
踩
通过一文搞懂这一系列的文章,我们已经知道了,Flink 作业的提交过程:
这篇文章主要聚焦在
我们以简单的代码为例
/** * @author shengjk1 * @date 2018/11/23 */ public class FlinkJava8Demo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4, 5); source.flatMap((Integer number, Collector<String> out)->{ StringBuilder builder = new StringBuilder(); for (int i = 0; i < number; i++) { builder.append("a"); out.collect(builder.toString()); } }).returns(Types.STRING).print(); source.map(i-> Tuple2.of(i,i)) .returns(Types.TUPLE(Types.INT,Types.INT)) .print(); env.execute("aa"); } }
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
flatMap 是 DataStream 的一个方法或者就是我们常数的算子,而 StreamFlatMap 其实才是 StreamOperator
transform 方法
protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R> outTypeInfo, StreamOperatorFactory<R> operatorFactory) { // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType(); OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked", "rawtypes"}) // DataStream 的一个子类 SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); //添加 operator,成为 StreamGraph 的一个 operator getExecutionEnvironment().addOperator(resultTransform); // 返回 stream,供下游继续操作 return returnStream; }
像 filter、map等都会进行类似的操作,flink sql 中也是采用这样的方式来将 ExecNode 转化为 flink operator 的
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。