赞
踩
Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。
- //1st 设置执行环境
- xxxEnvironment env = xxxEnvironment.getEnvironment;
-
- //2nd 设置流
- DataSource xxxDS=env.xxxx();
-
- //3rd 设置转换
- Xxx transformation =xxxDS.xxxx();
-
- //4th 设置sink
- transformation.print();
-
- //5th 可能需要
- env.execute();
- public static void main(String[] args) throws Exception {
- //1,创建一个执行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- //2,获取输入流
- DataSource<String> lineDS = env.readTextFile("input/word.txt");
- //3,处理数据
- FlatMapOperator<String, Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
- //3.1 分隔字符串
- String[] values = value.split(" ");
- //3.2 汇总统计
- for (String word : values) {
- Tuple2<String, Integer> wordTuple = Tuple2.of(word, 1);
- collector.collect(wordTuple);
- }
- }
- });
- //4,按单词聚合
- UnsortedGrouping<Tuple2<String, Integer>> tuple2UnsortedGrouping = wordDS.groupBy(0);
- //5,分组内聚合
- AggregateOperator<Tuple2<String, Integer>> sum = tuple2UnsortedGrouping.sum(1);
-
- //6,输出结果
- sum.print();
- }

- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<String> lineDS = env.readTextFile("input/word.txt");
-
- SingleOutputStreamOperator<Tuple2<String, Integer>> wordDS = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
- String[] words = value.split(" ");
- for (String word : words) {
- Tuple2<String, Integer> temp = Tuple2.of(word, 1);
- collector.collect(temp);
- }
- }
- });
-
- KeyedStream<Tuple2<String, Integer>, Tuple> wordCountKeyBy = wordDS.keyBy(0);
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordCountKeyBy.sum(1);
- sum.print();
- env.execute();
-
- }

- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource<String> lineDS = env.socketTextStream("192.168.3.11", 9999);
-
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = lineDS.flatMap(
- (String value, Collector<Tuple2<String, Integer>> out) -> {
- String[] words = value.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word, 1));
- }
- }
- ).returns(Types.TUPLE(Types.STRING, Types.INT))
- .keyBy(value -> value.f0)
- .sum(1);
-
- sum.print();
-
- env.execute();
-
- }

往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。