赞
踩
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime</artifactId> <version>1.16.0</version> </dependency> </dependencies>
import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.FlatMapOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class FlinkApp { public static void main(String[] args) { // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataSource<String> lineDataSource = env.readTextFile("E:\\input\\words.txt"); // 切分数据 FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }).returns(Types.TUPLE(Types.STRING, Types.LONG)); UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0); // 分组内进行聚合统计 AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1); try { sum.print(); } catch (Exception e) { e.printStackTrace(); } } }
文件内容示例:
hello world
hello flink
hello java
运行结果:
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。