赞
踩
目录
Flink提供了多个层次的api供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用难度就越大。DataSet已经不提倡使用了,被流批一体的DataStream代替。
Flink的应用程序结构主要包括三部分:Source/Transformation/Sink
使用Flink实现WordCount
导入成功
这个当时没截图了,不过下面结果那的图有里有包和类的结构
- package cn.edu.hgu.flink;
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.common.operators.Order;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.UnsortedGrouping;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
-
- /**
- * 使用Flink中的DataSet实现单词计数
- */
- public class WordCount {
- public static void main(String[] args) throws Exception {
- //1·准备环境-env
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- //2·准备数据-source
- DataSet<String> lineDS = env.fromElements("flink hadoop java hbase",
- "hadoop flink 0","hadoop hbase flink");
- //3·处理数据-transformation
- // 3.1 将每一行数据切分成一个一个的单词,组成一个集合
- DataSet<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String s, Collector<String> collector) throws Exception {
- //参数 s 就是一行行的数据再将每一行切分成一个个的单词
- String[] words = s.split(" ");
- // 将切分的单词收集起来,发到集合中去
- for (String word:words){
- collector.collect(word);
- }
- }
- });
- // 3.2对集合中的每一个单词记为1,成为一个三元组集合
- DataSet<Tuple2<String,Integer>> wordAndOneDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> map(String s) throws Exception {
- // 此处的s就是进来的一个个单词,再跟一组成一个二元组返回
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。